This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.97.0
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/rel/0.97.0 by this push:
new 7d804e06b4 fix: Avoid invalid migration when updating from 0.95.0 to
0.97.0 (#3405)
7d804e06b4 is described below
commit 7d804e06b4b4b7c3dd63d626d19119cd54fe6375
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Jan 9 11:17:09 2025 +0100
fix: Avoid invalid migration when updating from 0.95.0 to 0.97.0 (#3405)
---
.../jvm/TransformationExtensionModuleExport.java | 3 +-
...aticMetadataEnrichmentProcessorMigrationV1.java | 49 ++++++++++++++++++++++
.../StaticMetaDataEnrichmentProcessor.java | 4 +-
.../migrations/DataLakeSinkMigrationV1.java | 14 +++++--
.../migrations/INotificationDataSinkMigrator.java | 18 +++++---
5 files changed, 77 insertions(+), 11 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java
index fbd6f3180d..8b50d99a75 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationExtensionModuleExport.java
@@ -22,6 +22,7 @@ import
org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
+import
org.apache.streampipes.processors.transformation.jvm.migrations.StaticMetadataEnrichmentProcessorMigrationV1;
import
org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.array.split.SplitArrayProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.counter.BooleanCounterProcessor;
@@ -93,6 +94,6 @@ public class TransformationExtensionModuleExport implements
IExtensionModuleExpo
@Override
public List<IModelMigrator<?, ?>> migrators() {
- return Collections.emptyList();
+ return List.of(new StaticMetadataEnrichmentProcessorMigrationV1());
}
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java
new file mode 100644
index 0000000000..98568ae938
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/migrations/StaticMetadataEnrichmentProcessorMigrationV1.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.migrations;
+
+import
org.apache.streampipes.extensions.api.extractor.IDataProcessorParameterExtractor;
+import org.apache.streampipes.extensions.api.migration.IDataProcessorMigrator;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import
org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata.StaticMetaDataEnrichmentProcessor;
+
+/**
+ * Empty migration to resolve invalid version resolution in release 0.95.0
+ */
+public class StaticMetadataEnrichmentProcessorMigrationV1 implements
IDataProcessorMigrator {
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ StaticMetaDataEnrichmentProcessor.ID,
+ SpServiceTagPrefix.DATA_PROCESSOR,
+ 0,
+ 1
+ );
+ }
+
+ @Override
+ public MigrationResult<DataProcessorInvocation>
migrate(DataProcessorInvocation element,
+
IDataProcessorParameterExtractor extractor)
+ throws RuntimeException {
+ return MigrationResult.success(element);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java
index 863654a601..e60947db72 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/staticmetadata/StaticMetaDataEnrichmentProcessor.java
@@ -55,6 +55,8 @@ public class StaticMetaDataEnrichmentProcessor
implements IStreamPipesDataProcessor,
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation,
ProcessingElementParameterExtractor> {
+ public static final String ID =
"org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata";
+
protected static final String STATIC_METADATA_INPUT =
"static-metadata-input";
protected static final String STATIC_METADATA_INPUT_RUNTIME_NAME =
"static-metadata-input-runtime-name";
protected static final String STATIC_METADATA_INPUT_VALUE =
"static-metadata-input-value";
@@ -73,7 +75,7 @@ public class StaticMetaDataEnrichmentProcessor
return DataProcessorConfiguration.create(
StaticMetaDataEnrichmentProcessor::new,
ProcessingElementBuilder.create(
-
"org.apache.streampipes.processors.transformation.jvm.processor.staticmetadata",
+ ID,
1
)
.category(
diff --git
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java
index 7fc35ce9d8..f3ba255bcd 100644
---
a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java
+++
b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/migrations/DataLakeSinkMigrationV1.java
@@ -50,14 +50,22 @@ public class DataLakeSinkMigrationV1 implements
IDataSinkMigrator {
DataSinkInvocation element,
IDataSinkParameterExtractor extractor
) throws RuntimeException {
- var oneOfStaticProperty = createDefaultSchemaUpdateStrategy();
+ if (!isSchemaUpdateStrategyPresent(element)) {
+ var oneOfStaticProperty = createDefaultSchemaUpdateStrategy();
- element.getStaticProperties()
- .add(oneOfStaticProperty);
+ element.getStaticProperties()
+ .add(oneOfStaticProperty);
+ }
return MigrationResult.success(element);
}
+ private boolean isSchemaUpdateStrategyPresent(DataSinkInvocation element) {
+ return element.getStaticProperties()
+ .stream()
+ .anyMatch(sp ->
sp.getInternalName().equals(DataLakeSink.SCHEMA_UPDATE_KEY));
+ }
+
private static OneOfStaticProperty createDefaultSchemaUpdateStrategy() {
var label = Labels.from(
DataLakeSink.SCHEMA_UPDATE_KEY,
diff --git
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java
index f795c28460..56d9a89d35 100644
---
a/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java
+++
b/streampipes-extensions/streampipes-sinks-notifications-jvm/src/main/java/org/apache/streampipes/sinks/notifications/jvm/migrations/INotificationDataSinkMigrator.java
@@ -32,13 +32,19 @@ public interface INotificationDataSinkMigrator extends
IDataSinkMigrator {
@Override
default MigrationResult<DataSinkInvocation> migrate(DataSinkInvocation
element,
IDataSinkParameterExtractor extractor) throws RuntimeException {
- var fsp = new FreeTextStaticProperty(KEY_SILENT_PERIOD,
- "Silent Period [min]",
- "The minimum number of minutes between two consecutive notifications
that are sent",
- XSD.INTEGER);
- fsp.setValue(String.valueOf(DEFAULT_WAITING_TIME_MINUTES));
- element.getStaticProperties().add(fsp);
+ if (!isSilentPeriodPresent(element)) {
+ var fsp = new FreeTextStaticProperty(KEY_SILENT_PERIOD,
+ "Silent Period [min]",
+ "The minimum number of minutes between two consecutive notifications
that are sent",
+ XSD.INTEGER);
+ fsp.setValue(String.valueOf(DEFAULT_WAITING_TIME_MINUTES));
+ element.getStaticProperties().add(fsp);
+ }
return MigrationResult.success(element);
}
+
+ default boolean isSilentPeriodPresent(DataSinkInvocation element) {
+ return element.getStaticProperties().stream().anyMatch(e ->
e.getInternalName().equals(KEY_SILENT_PERIOD));
+ }
}