This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch SP-1132
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit ddb280fcfc3fb619801986859c16807ab647cfd4
Author: Philipp Zehnder <[email protected]>
AuthorDate: Sat Jan 21 07:21:16 2023 +0100

    [#1132] Fix preprocessing adapter rules
---
 .../TransformValueAdapterPipelineElement.java      |  4 +++
 .../transform/value/ValueEventTransformer.java     | 11 +++++-
 .../iiot/protocol/stream/FileStreamProtocol.java   | 42 +++++++++++++++-------
 .../fixtures/connect/aggregationRules/expected.csv |  3 +-
 .../fixtures/connect/aggregationRules/input.csv    | 13 ++++---
 ui/cypress/fixtures/connect/schemaRules/input.csv  |  4 +--
 .../fixtures/connect/valueRules/expected.csv       |  2 +-
 ui/cypress/fixtures/connect/valueRules/input.csv   |  2 +-
 ui/cypress/support/utils/connect/ConnectUtils.ts   |  6 +++-
 .../tests/adapter/rules/schemaRules.smoke.spec.ts  |  4 +--
 ui/cypress/tests/adapter/rules/streamRules.spec.ts |  4 +--
 ui/cypress/tests/adapter/rules/valueRules.ts       |  9 +++--
 12 files changed, 74 insertions(+), 30 deletions(-)

diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
index ba284501f..f40eff1d7 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
@@ -89,4 +89,8 @@ public class TransformValueAdapterPipelineElement implements 
IAdapterPipelineEle
   public Map<String, Object> process(Map<String, Object> event) {
     return eventTransformer.transform(event);
   }
+
+  public ValueEventTransformer getEventTransformer() {
+    return eventTransformer;
+  }
 }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
index 6516fa1ee..10fcf8f47 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
@@ -27,7 +27,7 @@ import java.util.Map;
 public class ValueEventTransformer implements ValueTransformationRule {
 
   private final List<UnitTransformationRule> unitTransformationRules;
-  private final List<TimestampTranformationRule> timestampTransformationRules;
+  private List<TimestampTranformationRule> timestampTransformationRules;
   private final List<CorrectionValueTransformationRule> 
correctionValueTransformationRules;
   private final List<DatatypeTransformationRule> datatypeTransformationRules;
 
@@ -71,4 +71,13 @@ public class ValueEventTransformer implements 
ValueTransformationRule {
 
     return event;
   }
+
+  public List<TimestampTranformationRule> getTimestampTransformationRules() {
+    return timestampTransformationRules;
+  }
+
+  public void setTimestampTransformationRules(
+      List<TimestampTranformationRule> timestampTransformationRules) {
+    this.timestampTransformationRules = timestampTransformationRules;
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
index 1c9fd49ff..b726cebff 100644
--- 
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
+++ 
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java
@@ -29,6 +29,7 @@ import 
org.apache.streampipes.extensions.api.connect.exception.ParseException;
 import 
org.apache.streampipes.extensions.management.connect.adapter.guess.SchemaGuesser;
 import 
org.apache.streampipes.extensions.management.connect.adapter.model.generic.Protocol;
 import 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.AddTimestampPipelineElement;
+import 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.TransformValueAdapterPipelineElement;
 import 
org.apache.streampipes.extensions.management.connect.adapter.preprocessing.transform.value.TimestampTranformationRule;
 import org.apache.streampipes.extensions.management.util.EventSchemaUtils;
 import org.apache.streampipes.model.AdapterType;
@@ -83,7 +84,7 @@ public class FileStreamProtocol extends Protocol {
   private int timeBetweenReplay;
 
   private Optional<IAdapterPipelineElement> addTimestampRule;
-  private Optional<IAdapterPipelineElement> transformationTimestampRule;
+  private Optional<List<TimestampTranformationRule>> 
transformationTimestampRule;
 
   private ScheduledExecutorService executor;
 
@@ -105,12 +106,11 @@ public class FileStreamProtocol extends Protocol {
     this.replayOnce = replayOnce;
   }
 
-  private Optional<IAdapterPipelineElement> checkAndRemovePipelineElement(
-      List<IAdapterPipelineElement> pipelineElements,
-      Class elementType) {
+  private Optional<IAdapterPipelineElement> 
checkAndRemoveAddTimestampPipelineElement(
+      List<IAdapterPipelineElement> pipelineElements) {
 
     var pipelineElement = pipelineElements.stream()
-        .filter(o -> o.getClass() == elementType)
+        .filter(o -> o.getClass() == AddTimestampPipelineElement.class)
         .findFirst();
 
     pipelineElement.ifPresent(pipelineElements::remove);
@@ -118,17 +118,33 @@ public class FileStreamProtocol extends Protocol {
     return pipelineElement;
   }
 
+  private Optional<List<TimestampTranformationRule>> 
checkAndRemoveChangeTimestampPipelineElement(
+      List<IAdapterPipelineElement> pipelineElements) {
+
+    var pipelineElement = pipelineElements.stream()
+        .filter(o -> o.getClass() == 
TransformValueAdapterPipelineElement.class)
+        .map(pe -> (TransformValueAdapterPipelineElement) pe)
+        .findFirst();
+
+    if (pipelineElement.isPresent()) {
+      var eventTransformer = pipelineElement.get().getEventTransformer();
+      var result = eventTransformer.getTimestampTransformationRules();
+      eventTransformer.setTimestampTransformationRules(List.of());
+
+      return Optional.of(result);
+    }
+    return Optional.empty();
+  }
+
   @Override
   public void run(IAdapterPipeline adapterPipeline) throws AdapterException {
     String timestampKey = 
getTimestampKey(adapterPipeline.getResultingEventSchema());
 
-    addTimestampRule = checkAndRemovePipelineElement(
-        adapterPipeline.getPipelineElements(),
-        AddTimestampPipelineElement.class);
+    addTimestampRule = checkAndRemoveAddTimestampPipelineElement(
+        adapterPipeline.getPipelineElements());
 
-    transformationTimestampRule = checkAndRemovePipelineElement(
-        adapterPipeline.getPipelineElements(),
-        TimestampTranformationRule.class);
+    transformationTimestampRule = checkAndRemoveChangeTimestampPipelineElement(
+        adapterPipeline.getPipelineElements());
 
 
     var eventProcessor = new LocalEventProcessor(adapterPipeline, 
timestampKey);
@@ -187,7 +203,9 @@ public class FileStreamProtocol extends Protocol {
         }
 
         if (transformationTimestampRule.isPresent()) {
-          eventMap = transformationTimestampRule.get().process(eventMap);
+          for (var rule : transformationTimestampRule.get()) {
+            rule.transform(eventMap);
+          }
         }
 
         long actualEventTimestamp = (long) eventMap.get(timestampKey);
diff --git a/ui/cypress/fixtures/connect/aggregationRules/expected.csv 
b/ui/cypress/fixtures/connect/aggregationRules/expected.csv
index 0e9ad9e3b..bcb273f68 100644
--- a/ui/cypress/fixtures/connect/aggregationRules/expected.csv
+++ b/ui/cypress/fixtures/connect/aggregationRules/expected.csv
@@ -1,3 +1,2 @@
 timestamp;value
-1623871501002;4.0
-1623871503004;2.0
+1623871499000;2.0
diff --git a/ui/cypress/fixtures/connect/aggregationRules/input.csv 
b/ui/cypress/fixtures/connect/aggregationRules/input.csv
index eaa79cf3b..61c507d80 100644
--- a/ui/cypress/fixtures/connect/aggregationRules/input.csv
+++ b/ui/cypress/fixtures/connect/aggregationRules/input.csv
@@ -1,6 +1,11 @@
 timestamp;value
 1623871499000;2.0
-1623871500001;3.0
-1623871501002;4.0
-1623871502003;5.0
-1623871503004;2.0
+1623871500000;3.0
+1623871501000;4.0
+1623871502000;5.0
+1623871503000;2.0
+1623871504000;3.0
+1623871505000;4.0
+1623871506000;5.0
+1623871507000;6.0
+1623871508000;7.0
diff --git a/ui/cypress/fixtures/connect/schemaRules/input.csv 
b/ui/cypress/fixtures/connect/schemaRules/input.csv
index 9a34acb46..f92f0b8ce 100644
--- a/ui/cypress/fixtures/connect/schemaRules/input.csv
+++ b/ui/cypress/fixtures/connect/schemaRules/input.csv
@@ -1,2 +1,2 @@
-timestamp;count;density;temperature
-1674159690000;122.0;62.0;11
+count;density;temperature
+122.0;62.0;11
diff --git a/ui/cypress/fixtures/connect/valueRules/expected.csv 
b/ui/cypress/fixtures/connect/valueRules/expected.csv
index d2b632c68..3ddfebcdc 100644
--- a/ui/cypress/fixtures/connect/valueRules/expected.csv
+++ b/ui/cypress/fixtures/connect/valueRules/expected.csv
@@ -1,2 +1,2 @@
 timestamp;temperature;value
-1623871490900;50.003334045410156;100.0
+1640346912123;50.003334045410156;100.0
diff --git a/ui/cypress/fixtures/connect/valueRules/input.csv 
b/ui/cypress/fixtures/connect/valueRules/input.csv
index da819853b..8f58cc7e8 100644
--- a/ui/cypress/fixtures/connect/valueRules/input.csv
+++ b/ui/cypress/fixtures/connect/valueRules/input.csv
@@ -1,2 +1,2 @@
 timestamp;value;temperature
-1623871490900;10.0;10.0
+2021-12-24T12:55:12.123Z+0100;10.0;10.0
diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts 
b/ui/cypress/support/utils/connect/ConnectUtils.ts
index f80cf3c0f..902ecd1fa 100644
--- a/ui/cypress/support/utils/connect/ConnectUtils.ts
+++ b/ui/cypress/support/utils/connect/ConnectUtils.ts
@@ -255,7 +255,11 @@ export class ConnectUtils {
         const adapterConfiguration = 
GenericAdapterBuilder.create('File_Stream')
             .setStoreInDataLake()
             .setTimestampProperty('timestamp')
-            .addProtocolInput('input', 'speed', '1')
+            .addProtocolInput(
+                'radio',
+                'speed',
+                'fastest_\\(ignore_original_time\\)',
+            )
             .setName('Adapter to test rules')
             .setFormat('csv')
             .addFormatInput('input', 'delimiter', ';')
diff --git a/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts 
b/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts
index 3ad0e3fda..3399ffd41 100644
--- a/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts
+++ b/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts
@@ -41,8 +41,8 @@ describe('Connect schema rule transformations', () => {
             'Integer',
         );
 
-        // Mark property as timestamp
-        ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
+        // Add a timestamp property
+        ConnectEventSchemaUtils.addTimestampProperty();
 
         ConnectEventSchemaUtils.finishEventSchemaConfiguration();
 
diff --git a/ui/cypress/tests/adapter/rules/streamRules.spec.ts 
b/ui/cypress/tests/adapter/rules/streamRules.spec.ts
index 8b5d2469a..6c874f594 100644
--- a/ui/cypress/tests/adapter/rules/streamRules.spec.ts
+++ b/ui/cypress/tests/adapter/rules/streamRules.spec.ts
@@ -38,7 +38,7 @@ describe('Connect aggregation rule transformations', () => {
             adapterConfiguration,
             'cypress/fixtures/connect/aggregationRules/expected.csv',
             false,
-            5000,
+            2000,
         );
     });
 });
@@ -62,7 +62,7 @@ describe('Remove duplicates rule transformations', () => {
             adapterConfiguration,
             'cypress/fixtures/connect/removeDuplicateRules/expected.csv',
             false,
-            4000,
+            2000,
         );
     });
 });
diff --git a/ui/cypress/tests/adapter/rules/valueRules.ts 
b/ui/cypress/tests/adapter/rules/valueRules.ts
index 9a5d8b1fe..d53de16de 100644
--- a/ui/cypress/tests/adapter/rules/valueRules.ts
+++ b/ui/cypress/tests/adapter/rules/valueRules.ts
@@ -29,6 +29,12 @@ describe('Connect value rule transformations', () => {
     it('Perform Test', () => {
         const adapterConfiguration = ConnectUtils.setUpPreprocessingRuleTest();
 
+        // Edit timestamp property
+        ConnectEventSchemaUtils.editTimestampProperty(
+            'timestamp',
+            "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'",
+        );
+
         // Number transformation
         ConnectEventSchemaUtils.numberTransformation('value', '10');
 
@@ -39,14 +45,13 @@ describe('Connect value rule transformations', () => {
             'Degree Fahrenheit',
         );
 
-        ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp');
-
         ConnectEventSchemaUtils.finishEventSchemaConfiguration();
 
         ConnectUtils.tearDownPreprocessingRuleTest(
             adapterConfiguration,
             'cypress/fixtures/connect/valueRules/expected.csv',
             false,
+            2000,
         );
     });
 });

Reply via email to