This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch fix-release-migration
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 372f24624768a50e5f95deee4c9c7aa98be8634a
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 9 10:02:04 2025 +0100

    fix: Avoid invalid migration when updating from 0.95.0 to 0.97.0
---
 .../jvm/TransformationExtensionModuleExport.java   |  3 +-
 ...aticMetadataEnrichmentProcessorMigrationV1.java | 49 ++++++++++++++++++++++
 .../StaticMetaDataEnrichmentProcessor.java         |  4 +-
 .../migrations/DataLakeSinkMigrationV1.java        | 14 +++++--
 .../migrations/INotificationDataSinkMigrator.java  | 18 +++++---
 5 files changed, 77 insertions(+), 11 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java
index fbd6f3180d..8b50d99a75 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java
@@ -22,6 +22,7 @@ import 
org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
 import org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
 import org.apache.streampipes.extensions.api.migration.IModelMigrator;
 import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
+import 
org.apache.streampipes.processors.transformation.jvm.migrations.StaticMetadataEnrichmentProcessorMigrationV1;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.array.split.SplitArrayProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.counter.BooleanCounterProcessor;
@@ -93,6 +94,6 @@ public class TransformationExtensionModuleExport implements 
IExtensionModuleExpo
 
   @Override
   public List<IModelMigrator<?, ?>> migrators() {
-    return Collections.emptyList();
+    return List.of(new StaticMetadataEnrichmentProcessorMigrationV1());
   }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java
new file mode 100644
index 0000000000..98568ae938
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.migrations;
+
+import 
org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataProcessorMigrator;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import 
org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata.StaticMetaDataEnrichmentProcessor;
+
+/**
+ * Empty migration to resolve invalid version resolution in release 0.95.0
+ */
+public class StaticMetadataEnrichmentProcessorMigrationV1 implements 
IDataProcessorMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        StaticMetaDataEnrichmentProcessor.ID,
+        SpServiceTagPrefix.DATA_PROCESSOR,
+        0,
+        1
+    );
+  }
+
+  @Override
+  public MigrationResult<DataProcessorInvocation> 
migrate(DataProcessorInvocation element,
+                                                          
IDataProcessorParameterExtractor extractor)
+      throws RuntimeException {
+    return MigrationResult.success(element);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java
index 863654a601..e60947db72 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java
@@ -55,6 +55,8 @@ public class StaticMetaDataEnrichmentProcessor
     implements IStreamPipesDataProcessor,
                
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, 
ProcessingElementParameterExtractor> {
 
+  public static final String ID = 
"org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata";
+
   protected static final String STATIC_METADATA_INPUT = 
"static-metadata-input";
   protected static final String STATIC_METADATA_INPUT_RUNTIME_NAME = 
"static-metadata-input-runtime-name";
   protected static final String STATIC_METADATA_INPUT_VALUE = 
"static-metadata-input-value";
@@ -73,7 +75,7 @@ public class StaticMetaDataEnrichmentProcessor
     return DataProcessorConfiguration.create(
         StaticMetaDataEnrichmentProcessor::new,
         ProcessingElementBuilder.create(
-                                    
"org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata",
+                                    ID,
                                     1
                                 )
                                 .category(
diff --git 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java
 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java
index 7fc35ce9d8..f3ba255bcd 100644
--- 
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java
+++ 
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java
@@ -50,14 +50,22 @@ public class DataLakeSinkMigrationV1 implements 
IDataSinkMigrator {
       DataSinkInvocation element,
       IDataSinkParameterExtractor extractor
   ) throws RuntimeException {
-    var oneOfStaticProperty = createDefaultSchemaUpdateStrategy();
+    if (!isSchemaUpdateStrategyPresent(element)) {
+      var oneOfStaticProperty = createDefaultSchemaUpdateStrategy();
 
-    element.getStaticProperties()
-           .add(oneOfStaticProperty);
+      element.getStaticProperties()
+          .add(oneOfStaticProperty);
+    }
 
     return MigrationResult.success(element);
   }
 
+  private boolean isSchemaUpdateStrategyPresent(DataSinkInvocation element) {
+    return element.getStaticProperties()
+        .stream()
+        .anyMatch(sp -> 
sp.getInternalName().equals(DataLakeSink.SCHEMA_UPDATE_KEY));
+  }
+
   private static OneOfStaticProperty createDefaultSchemaUpdateStrategy() {
     var label = Labels.from(
         DataLakeSink.SCHEMA_UPDATE_KEY,
diff --git 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java
 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java
index f795c28460..56d9a89d35 100644
--- 
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java
+++ 
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java
@@ -32,13 +32,19 @@ public interface INotificationDataSinkMigrator extends 
IDataSinkMigrator  {
   @Override
   default MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation 
element,
                                                       
IDataSinkParameterExtractor extractor) throws RuntimeException {
-    var fsp = new FreeTextStaticProperty(KEY_SILENT_PERIOD,
-        "Silent Period [min]",
-        "The minimum number of minutes between two consecutive notifications 
that are sent",
-        XSD.INTEGER);
-    fsp.setValue(String.valueOf(DEFAULT_WAITING_TIME_MINUTES));
-    element.getStaticProperties().add(fsp);
+    if (!isSilentPeriodPresent(element)) {
+      var fsp = new FreeTextStaticProperty(KEY_SILENT_PERIOD,
+          "Silent Period [min]",
+          "The minimum number of minutes between two consecutive notifications 
that are sent",
+          XSD.INTEGER);
+      fsp.setValue(String.valueOf(DEFAULT_WAITING_TIME_MINUTES));
+      element.getStaticProperties().add(fsp);
+    }
 
     return MigrationResult.success(element);
   }
+
+  default boolean isSilentPeriodPresent(DataSinkInvocation element) {
+    return element.getStaticProperties().stream().anyMatch(e -> 
e.getInternalName().equals(KEY_SILENT_PERIOD));
+  }
 }

Reply via email to