This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch fix-release-migration in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 372f24624768a50e5f95deee4c9c7aa98be8634a Author: Dominik Riemer <[email protected]> AuthorDate: Thu Jan 9 10:02:04 2025 +0100 fix: Avoid invalid migration when updating from 0.95.0 to 0.97.0 --- .../jvm/TransformationExtensionModuleExport.java | 3 +- ...aticMetadataEnrichmentProcessorMigrationV1.java | 49 ++++++++++++++++++++++ .../StaticMetaDataEnrichmentProcessor.java | 4 +- .../migrations/DataLakeSinkMigrationV1.java | 14 +++++-- .../migrations/INotificationDataSinkMigrator.java | 18 +++++--- 5 files changed, 77 insertions(+), 11 deletions(-) diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java index fbd6f3180d..8b50d99a75 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java @@ -22,6 +22,7 @@ import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport; import org.apache.streampipes.extensions.api.migration.IModelMigrator; import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement; +import org.apache.streampipes.processors.transformation.jvm.migrations.StaticMetadataEnrichmentProcessorMigrationV1; import org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayProcessor; import org.apache.streampipes.processors.transformation.jvm.processor.array.split.SplitArrayProcessor; import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.counter.BooleanCounterProcessor; @@ -93,6 +94,6 @@ public class TransformationExtensionModuleExport implements IExtensionModuleExpo @Override public List<IModelMigrator<?, ?>> migrators() { - return Collections.emptyList(); + return List.of(new StaticMetadataEnrichmentProcessorMigrationV1()); } } diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java new file mode 100644 index 0000000000..98568ae938 --- /dev/null +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.processors.transformation.jvm.migrations; + +import org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor; +import org.apache.streampipes.extensions.api.migration.IDataProcessorMigrator; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.graph.DataProcessorInvocation; +import org.apache.streampipes.model.migration.MigrationResult; +import org.apache.streampipes.model.migration.ModelMigratorConfig; +import org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata.StaticMetaDataEnrichmentProcessor; + +/** + * Empty migration to resolve invalid version resolution in release 0.95.0 + */ +public class StaticMetadataEnrichmentProcessorMigrationV1 implements IDataProcessorMigrator { + @Override + public ModelMigratorConfig config() { + return new ModelMigratorConfig( + StaticMetaDataEnrichmentProcessor.ID, + SpServiceTagPrefix.DATA_PROCESSOR, + 0, + 1 + ); + } + + @Override + public MigrationResult<DataProcessorInvocation> migrate(DataProcessorInvocation element, + IDataProcessorParameterExtractor extractor) + throws RuntimeException { + return MigrationResult.success(element); + } +} diff --git a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java index 863654a601..e60947db72 100644 --- a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java +++ b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java @@ -55,6 +55,8 @@ public class StaticMetaDataEnrichmentProcessor implements IStreamPipesDataProcessor, ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> { + public static final String ID = "org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata"; + protected static final String STATIC_METADATA_INPUT = "static-metadata-input"; protected static final String STATIC_METADATA_INPUT_RUNTIME_NAME = "static-metadata-input-runtime-name"; protected static final String STATIC_METADATA_INPUT_VALUE = "static-metadata-input-value"; @@ -73,7 +75,7 @@ public class StaticMetaDataEnrichmentProcessor return DataProcessorConfiguration.create( StaticMetaDataEnrichmentProcessor::new, ProcessingElementBuilder.create( - "org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata", + ID, 1 ) .category( diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java index 7fc35ce9d8..f3ba255bcd 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java @@ -50,14 +50,22 @@ public class DataLakeSinkMigrationV1 implements IDataSinkMigrator { DataSinkInvocation element, IDataSinkParameterExtractor extractor ) throws RuntimeException { - var oneOfStaticProperty = createDefaultSchemaUpdateStrategy(); + if (!isSchemaUpdateStrategyPresent(element)) { + var oneOfStaticProperty = createDefaultSchemaUpdateStrategy(); - element.getStaticProperties() - .add(oneOfStaticProperty); + element.getStaticProperties() + .add(oneOfStaticProperty); + } return MigrationResult.success(element); } + private boolean isSchemaUpdateStrategyPresent(DataSinkInvocation element) { + return element.getStaticProperties() + .stream() + .anyMatch(sp -> sp.getInternalName().equals(DataLakeSink.SCHEMA_UPDATE_KEY)); + } + private static OneOfStaticProperty createDefaultSchemaUpdateStrategy() { var label = Labels.from( DataLakeSink.SCHEMA_UPDATE_KEY, diff --git a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java index f795c28460..56d9a89d35 100644 --- a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java +++ b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java @@ -32,13 +32,19 @@ public interface INotificationDataSinkMigrator extends IDataSinkMigrator { @Override default MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation element, IDataSinkParameterExtractor extractor) throws RuntimeException { - var fsp = new FreeTextStaticProperty(KEY_SILENT_PERIOD, - "Silent Period [min]", - "The minimum number of minutes between two consecutive notifications that are sent", - XSD.INTEGER); - fsp.setValue(String.valueOf(DEFAULT_WAITING_TIME_MINUTES)); - element.getStaticProperties().add(fsp); + if (!isSilentPeriodPresent(element)) { + var fsp = new FreeTextStaticProperty(KEY_SILENT_PERIOD, + "Silent Period [min]", + "The minimum number of minutes between two consecutive notifications that are sent", + XSD.INTEGER); + fsp.setValue(String.valueOf(DEFAULT_WAITING_TIME_MINUTES)); + element.getStaticProperties().add(fsp); + } return MigrationResult.success(element); } + + default boolean isSilentPeriodPresent(DataSinkInvocation element) { + return element.getStaticProperties().stream().anyMatch(e -> e.getInternalName().equals(KEY_SILENT_PERIOD)); + } }
