This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch SP-1132 in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit ddb280fcfc3fb619801986859c16807ab647cfd4 Author: Philipp Zehnder <[email protected]> AuthorDate: Sat Jan 21 07:21:16 2023 +0100 [#1132] Fix preprocessing adapter rules --- .../TransformValueAdapterPipelineElement.java | 4 +++ .../transform/value/ValueEventTransformer.java | 11 +++++- .../iiot/protocol/stream/FileStreamProtocol.java | 42 +++++++++++++++------- .../fixtures/connect/aggregationRules/expected.csv | 3 +- .../fixtures/connect/aggregationRules/input.csv | 13 ++++--- ui/cypress/fixtures/connect/schemaRules/input.csv | 4 +-- .../fixtures/connect/valueRules/expected.csv | 2 +- ui/cypress/fixtures/connect/valueRules/input.csv | 2 +- ui/cypress/support/utils/connect/ConnectUtils.ts | 6 +++- .../tests/adapter/rules/schemaRules.smoke.spec.ts | 4 +-- ui/cypress/tests/adapter/rules/streamRules.spec.ts | 4 +-- ui/cypress/tests/adapter/rules/valueRules.ts | 9 +++-- 12 files changed, 74 insertions(+), 30 deletions(-) diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java index ba284501f..f40eff1d7 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java @@ -89,4 +89,8 @@ public class TransformValueAdapterPipelineElement implements IAdapterPipelineEle public Map<String, Object> process(Map<String, Object> event) { return eventTransformer.transform(event); } + + public ValueEventTransformer getEventTransformer() { + return eventTransformer; + } } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java index 6516fa1ee..10fcf8f47 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java @@ -27,7 +27,7 @@ import java.util.Map; public class ValueEventTransformer implements ValueTransformationRule { private final List<UnitTransformationRule> unitTransformationRules; - private final List<TimestampTranformationRule> timestampTransformationRules; + private List<TimestampTranformationRule> timestampTransformationRules; private final List<CorrectionValueTransformationRule> correctionValueTransformationRules; private final List<DatatypeTransformationRule> datatypeTransformationRules; @@ -71,4 +71,13 @@ public class ValueEventTransformer implements ValueTransformationRule { return event; } + + public List<TimestampTranformationRule> getTimestampTransformationRules() { + return timestampTransformationRules; + } + + public void setTimestampTransformationRules( + List<TimestampTranformationRule> timestampTransformationRules) { + this.timestampTransformationRules = timestampTransformationRules; + } } diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java index 1c9fd49ff..b726cebff 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/protocol/stream/FileStreamProtocol.java @@ -29,6 +29,7 @@ import org.apache.streampipes.extensions.api.connect.exception.ParseException; import org.apache.streampipes.extensions.management.connect.adapter.guess.SchemaGuesser; import org.apache.streampipes.extensions.management.connect.adapter.model.generic.Protocol; import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.AddTimestampPipelineElement; +import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.elements.TransformValueAdapterPipelineElement; import org.apache.streampipes.extensions.management.connect.adapter.preprocessing.transform.value.TimestampTranformationRule; import org.apache.streampipes.extensions.management.util.EventSchemaUtils; import org.apache.streampipes.model.AdapterType; @@ -83,7 +84,7 @@ public class FileStreamProtocol extends Protocol { private int timeBetweenReplay; private Optional<IAdapterPipelineElement> addTimestampRule; - private Optional<IAdapterPipelineElement> transformationTimestampRule; + private Optional<List<TimestampTranformationRule>> transformationTimestampRule; private ScheduledExecutorService executor; @@ -105,12 +106,11 @@ public class FileStreamProtocol extends Protocol { this.replayOnce = replayOnce; } - private Optional<IAdapterPipelineElement> checkAndRemovePipelineElement( - List<IAdapterPipelineElement> pipelineElements, - Class elementType) { + private Optional<IAdapterPipelineElement> checkAndRemoveAddTimestampPipelineElement( + List<IAdapterPipelineElement> pipelineElements) { var pipelineElement = pipelineElements.stream() - .filter(o -> o.getClass() == elementType) + .filter(o -> o.getClass() == AddTimestampPipelineElement.class) .findFirst(); pipelineElement.ifPresent(pipelineElements::remove); @@ -118,17 +118,33 @@ public class FileStreamProtocol extends Protocol { return pipelineElement; } + private Optional<List<TimestampTranformationRule>> checkAndRemoveChangeTimestampPipelineElement( + List<IAdapterPipelineElement> pipelineElements) { + + var pipelineElement = pipelineElements.stream() + .filter(o -> o.getClass() == TransformValueAdapterPipelineElement.class) + .map(pe -> (TransformValueAdapterPipelineElement) pe) + .findFirst(); + + if (pipelineElement.isPresent()) { + var eventTransformer = pipelineElement.get().getEventTransformer(); + var result = eventTransformer.getTimestampTransformationRules(); + eventTransformer.setTimestampTransformationRules(List.of()); + + return Optional.of(result); + } + return Optional.empty(); + } + @Override public void run(IAdapterPipeline adapterPipeline) throws AdapterException { String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema()); - addTimestampRule = checkAndRemovePipelineElement( - adapterPipeline.getPipelineElements(), - AddTimestampPipelineElement.class); + addTimestampRule = checkAndRemoveAddTimestampPipelineElement( + adapterPipeline.getPipelineElements()); - transformationTimestampRule = checkAndRemovePipelineElement( - adapterPipeline.getPipelineElements(), - TimestampTranformationRule.class); + transformationTimestampRule = checkAndRemoveChangeTimestampPipelineElement( + adapterPipeline.getPipelineElements()); var eventProcessor = new LocalEventProcessor(adapterPipeline, timestampKey); @@ -187,7 +203,9 @@ public class FileStreamProtocol extends Protocol { } if (transformationTimestampRule.isPresent()) { - eventMap = transformationTimestampRule.get().process(eventMap); + for (var rule : transformationTimestampRule.get()) { + rule.transform(eventMap); + } } long actualEventTimestamp = (long) eventMap.get(timestampKey); diff --git a/ui/cypress/fixtures/connect/aggregationRules/expected.csv b/ui/cypress/fixtures/connect/aggregationRules/expected.csv index 0e9ad9e3b..bcb273f68 100644 --- a/ui/cypress/fixtures/connect/aggregationRules/expected.csv +++ b/ui/cypress/fixtures/connect/aggregationRules/expected.csv @@ -1,3 +1,2 @@ timestamp;value -1623871501002;4.0 -1623871503004;2.0 +1623871499000;2.0 diff --git a/ui/cypress/fixtures/connect/aggregationRules/input.csv b/ui/cypress/fixtures/connect/aggregationRules/input.csv index eaa79cf3b..61c507d80 100644 --- a/ui/cypress/fixtures/connect/aggregationRules/input.csv +++ b/ui/cypress/fixtures/connect/aggregationRules/input.csv @@ -1,6 +1,11 @@ timestamp;value 1623871499000;2.0 -1623871500001;3.0 -1623871501002;4.0 -1623871502003;5.0 -1623871503004;2.0 +1623871500000;3.0 +1623871501000;4.0 +1623871502000;5.0 +1623871503000;2.0 +1623871504000;3.0 +1623871505000;4.0 +1623871506000;5.0 +1623871507000;6.0 +1623871508000;7.0 diff --git a/ui/cypress/fixtures/connect/schemaRules/input.csv b/ui/cypress/fixtures/connect/schemaRules/input.csv index 9a34acb46..f92f0b8ce 100644 --- a/ui/cypress/fixtures/connect/schemaRules/input.csv +++ b/ui/cypress/fixtures/connect/schemaRules/input.csv @@ -1,2 +1,2 @@ -timestamp;count;density;temperature -1674159690000;122.0;62.0;11 +count;density;temperature +122.0;62.0;11 diff --git a/ui/cypress/fixtures/connect/valueRules/expected.csv b/ui/cypress/fixtures/connect/valueRules/expected.csv index d2b632c68..3ddfebcdc 100644 --- a/ui/cypress/fixtures/connect/valueRules/expected.csv +++ b/ui/cypress/fixtures/connect/valueRules/expected.csv @@ -1,2 +1,2 @@ timestamp;temperature;value -1623871490900;50.003334045410156;100.0 +1640346912123;50.003334045410156;100.0 diff --git a/ui/cypress/fixtures/connect/valueRules/input.csv b/ui/cypress/fixtures/connect/valueRules/input.csv index da819853b..8f58cc7e8 100644 --- a/ui/cypress/fixtures/connect/valueRules/input.csv +++ b/ui/cypress/fixtures/connect/valueRules/input.csv @@ -1,2 +1,2 @@ timestamp;value;temperature -1623871490900;10.0;10.0 +2021-12-24T12:55:12.123Z+0100;10.0;10.0 diff --git a/ui/cypress/support/utils/connect/ConnectUtils.ts b/ui/cypress/support/utils/connect/ConnectUtils.ts index f80cf3c0f..902ecd1fa 100644 --- a/ui/cypress/support/utils/connect/ConnectUtils.ts +++ b/ui/cypress/support/utils/connect/ConnectUtils.ts @@ -255,7 +255,11 @@ export class ConnectUtils { const adapterConfiguration = GenericAdapterBuilder.create('File_Stream') .setStoreInDataLake() .setTimestampProperty('timestamp') - .addProtocolInput('input', 'speed', '1') + .addProtocolInput( + 'radio', + 'speed', + 'fastest_\\(ignore_original_time\\)', + ) .setName('Adapter to test rules') .setFormat('csv') .addFormatInput('input', 'delimiter', ';') diff --git a/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts b/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts index 3ad0e3fda..3399ffd41 100644 --- a/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts +++ b/ui/cypress/tests/adapter/rules/schemaRules.smoke.spec.ts @@ -41,8 +41,8 @@ describe('Connect schema rule transformations', () => { 'Integer', ); - // Mark property as timestamp - ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp'); + // Add a timestamp property + ConnectEventSchemaUtils.addTimestampProperty(); ConnectEventSchemaUtils.finishEventSchemaConfiguration(); diff --git a/ui/cypress/tests/adapter/rules/streamRules.spec.ts b/ui/cypress/tests/adapter/rules/streamRules.spec.ts index 8b5d2469a..6c874f594 100644 --- a/ui/cypress/tests/adapter/rules/streamRules.spec.ts +++ b/ui/cypress/tests/adapter/rules/streamRules.spec.ts @@ -38,7 +38,7 @@ describe('Connect aggregation rule transformations', () => { adapterConfiguration, 'cypress/fixtures/connect/aggregationRules/expected.csv', false, - 5000, + 2000, ); }); }); @@ -62,7 +62,7 @@ describe('Remove duplicates rule transformations', () => { adapterConfiguration, 'cypress/fixtures/connect/removeDuplicateRules/expected.csv', false, - 4000, + 2000, ); }); }); diff --git a/ui/cypress/tests/adapter/rules/valueRules.ts b/ui/cypress/tests/adapter/rules/valueRules.ts index 9a5d8b1fe..d53de16de 100644 --- a/ui/cypress/tests/adapter/rules/valueRules.ts +++ b/ui/cypress/tests/adapter/rules/valueRules.ts @@ -29,6 +29,12 @@ describe('Connect value rule transformations', () => { it('Perform Test', () => { const adapterConfiguration = ConnectUtils.setUpPreprocessingRuleTest(); + // Edit timestamp property + ConnectEventSchemaUtils.editTimestampProperty( + 'timestamp', + "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", + ); + // Number transformation ConnectEventSchemaUtils.numberTransformation('value', '10'); @@ -39,14 +45,13 @@ describe('Connect value rule transformations', () => { 'Degree Fahrenheit', ); - ConnectEventSchemaUtils.markPropertyAsTimestamp('timestamp'); - ConnectEventSchemaUtils.finishEventSchemaConfiguration(); ConnectUtils.tearDownPreprocessingRuleTest( adapterConfiguration, 'cypress/fixtures/connect/valueRules/expected.csv', false, + 2000, ); }); });
