This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new a99f0cd44 Migrate StringTimer processing elements (#1453)
a99f0cd44 is described below
commit a99f0cd44da02b1aa08f40dd2934d4defd77f76f
Author: Liu Xiao <[email protected]>
AuthorDate: Mon Mar 27 15:57:11 2023 +0800
Migrate StringTimer processing elements (#1453)
* Migrate StringTimer processing elements
* add more test
---
.../transformation/jvm/TransformationJvmInit.java | 4 +-
.../stringoperator/timer/StringTimer.java | 79 -------
.../timer/StringTimerParameters.java | 48 ----
...erController.java => StringTimerProcessor.java} | 252 ++++++++++++---------
.../timer/TestStringTimerProcessor.java | 213 +++++++++++++++++
5 files changed, 362 insertions(+), 234 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 885379999..14c357eff 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -44,7 +44,7 @@ import
org.apache.streampipes.processors.transformation.jvm.processor.measuremen
import
org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state.StringToStateProcessor;
-import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
+import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
import
org.apache.streampipes.processors.transformation.jvm.processor.timestampextractor.TimestampExtractorController;
import
org.apache.streampipes.processors.transformation.jvm.processor.transformtoboolean.TransformToBooleanController;
@@ -80,7 +80,7 @@ public class TransformationJvmInit extends
ExtensionsModelSubmitter {
new MeasurementUnitConverterProcessor(),
new TaskDurationController(),
new TransformToBooleanController(),
- new StringTimerController(),
+ new StringTimerProcessor(),
new SignalEdgeFilterController(),
new BooleanToStateController(),
new NumberLabelerController(),
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java
deleted file mode 100644
index 00896360d..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-public class StringTimer implements EventProcessor<StringTimerParameters> {
-
- private static Logger log;
-
- private String selectedFieldName;
- private Long timestamp;
- private double outputDivisor;
- private String fieldValueOfLastEvent;
- private boolean useInputFrequencyForOutputFrequency;
-
- @Override
- public void onInvocation(StringTimerParameters stringTimerParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) {
- log = stringTimerParameters.getGraph().getLogger(StringTimer.class);
- this.selectedFieldName = stringTimerParameters.getSelectedFieldName();
- this.outputDivisor = stringTimerParameters.getOutputDivisor();
- this.useInputFrequencyForOutputFrequency =
stringTimerParameters.isUseInputFrequencyForOutputFrequency();
-
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
-
- String value =
inputEvent.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
- Long currentTime = System.currentTimeMillis();
-
- if (this.fieldValueOfLastEvent == null) {
- this.timestamp = currentTime;
- } else {
- // Define if result event should be emitted or not
- if (this.useInputFrequencyForOutputFrequency ||
!this.fieldValueOfLastEvent.equals(value)) {
- Long difference = currentTime - this.timestamp;
- double result = difference / this.outputDivisor;
-
-
inputEvent.addField(StringTimerController.MEASURED_TIME_FIELD_RUNTIME_NAME,
result);
- inputEvent.addField(StringTimerController.FIELD_VALUE_RUNTIME_NAME,
this.fieldValueOfLastEvent);
- out.collect(inputEvent);
- }
-
- // if state changes reset timestamp
- if (!this.fieldValueOfLastEvent.equals(value)) {
- timestamp = currentTime;
- }
-
- }
- this.fieldValueOfLastEvent = value;
- }
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java
deleted file mode 100644
index d9e0cb4f3..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class StringTimerParameters extends EventProcessorBindingParams {
- private String fieldName;
- private double outputDivisor;
- private boolean useInputFrequencyForOutputFrequency;
-
- public StringTimerParameters(DataProcessorInvocation graph, String
fieldName, double outputDivisor,
- boolean useInputFrequencyForOutputFrequency) {
- super(graph);
- this.fieldName = fieldName;
- this.outputDivisor = outputDivisor;
- this.useInputFrequencyForOutputFrequency =
useInputFrequencyForOutputFrequency;
- }
-
- public String getSelectedFieldName() {
- return fieldName;
- }
-
- public double getOutputDivisor() {
- return outputDivisor;
- }
-
- public boolean isUseInputFrequencyForOutputFrequency() {
- return useInputFrequencyForOutputFrequency;
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerProcessor.java
similarity index 55%
rename from
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java
rename to
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerProcessor.java
index 5012586dd..32fb42bbf 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerProcessor.java
@@ -1,105 +1,147 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class StringTimerController extends
StandaloneEventProcessingDeclarer<StringTimerParameters> {
-
- public static final String FIELD_ID = "field";
- public static final String MEASURED_TIME_ID = "measuredTime";
- public static final String FIELD_VALUE_ID = "fieldValue";
-
- public static final String OUTPUT_UNIT_ID = "outputUnit";
- private static final String MILLISECONDS = "Milliseconds";
- private static final String SECONDS = "Seconds";
- private static final String MINUTES = "Minutes";
-
- public static final String OUTPUT_FREQUENCY = "outputFrequency";
- private static final String ON_INPUT_EVENT = "On Input Event";
- private static final String ON_STRING_VALUE_CHANGE = "When String Value
Changes";
-
- public static final String MEASURED_TIME_FIELD_RUNTIME_NAME =
"measured_time";
- public static final String FIELD_VALUE_RUNTIME_NAME = "field_value";
-
-
- @Override
- public DataProcessorDescription declareModel() {
- return
ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.stringoperator.timer")
- .category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.stringReq(),
- Labels.withId(FIELD_ID),
- PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
Options.from(MILLISECONDS, SECONDS, MINUTES))
- .requiredSingleValueSelection(Labels.withId(OUTPUT_FREQUENCY),
- Options.from(ON_INPUT_EVENT, ON_STRING_VALUE_CHANGE))
- .outputStrategy(OutputStrategies.append(
- EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID),
MEASURED_TIME_FIELD_RUNTIME_NAME,
- "http://schema.org/Number"),
- EpProperties.stringEp(Labels.withId(FIELD_VALUE_ID),
FIELD_VALUE_RUNTIME_NAME, "http://schema.org/String")
- ))
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<StringTimerParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
-
- String selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
- String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID,
String.class);
- String outputFrequency = extractor.selectedSingleValue(OUTPUT_FREQUENCY,
String.class);
-
- double outputDivisor = 1.0;
- if (outputUnit.equals(SECONDS)) {
- outputDivisor = 1000.0;
- } else if (outputUnit.equals(MINUTES)) {
- outputDivisor = 60000.0;
- }
-
- boolean useInputFrequencyForOutputFrequency = false;
- if (ON_INPUT_EVENT.equals(outputFrequency)) {
- useInputFrequencyForOutputFrequency = true;
- }
-
- StringTimerParameters params =
- new StringTimerParameters(graph, selectedFieldName, outputDivisor,
useInputFrequencyForOutputFrequency);
-
- return new ConfiguredEventProcessor<>(params, StringTimer::new);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.Objects;
+
+public class StringTimerProcessor extends StreamPipesDataProcessor {
+
+ private static Logger log;
+
+ public static final String FIELD_ID = "field";
+ public static final String MEASURED_TIME_ID = "measuredTime";
+ public static final String FIELD_VALUE_ID = "fieldValue";
+
+ public static final String OUTPUT_UNIT_ID = "outputUnit";
+ private static final String MILLISECONDS = "Milliseconds";
+ private static final String SECONDS = "Seconds";
+ private static final String MINUTES = "Minutes";
+
+ public static final String OUTPUT_FREQUENCY = "outputFrequency";
+ private static final String ON_INPUT_EVENT = "On Input Event";
+ private static final String ON_STRING_VALUE_CHANGE = "When String Value
Changes";
+
+ public static final String MEASURED_TIME_FIELD_RUNTIME_NAME =
"measured_time";
+ public static final String FIELD_VALUE_RUNTIME_NAME = "field_value";
+
+ private String selectedFieldName;
+ private long timestamp;
+
+ private double outputDivisor;
+ private boolean useInputFrequencyForOutputFrequency;
+
+ private String fieldValueOfLastEvent;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return
ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.stringoperator.timer")
+ .category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.stringReq(),
+ Labels.withId(FIELD_ID),
+ PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
+ Options.from(MILLISECONDS, SECONDS, MINUTES))
+ .requiredSingleValueSelection(Labels.withId(OUTPUT_FREQUENCY),
+ Options.from(ON_INPUT_EVENT, ON_STRING_VALUE_CHANGE))
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID),
+ MEASURED_TIME_FIELD_RUNTIME_NAME, "http://schema.org/Number"),
+ EpProperties.stringEp(Labels.withId(FIELD_VALUE_ID),
+ FIELD_VALUE_RUNTIME_NAME, "http://schema.org/String")
+ ))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ log = parameters.getGraph().getLogger(StringTimerProcessor.class);
+ ProcessingElementParameterExtractor extractor = parameters.extractor();
+
+ this.selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
+ String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID,
String.class);
+ String outputFrequency = extractor.selectedSingleValue(OUTPUT_FREQUENCY,
String.class);
+
+ this.outputDivisor = 1.0;
+ if (SECONDS.equals(outputUnit)) {
+ this.outputDivisor = 1000.0;
+ } else if (MINUTES.equals(outputUnit)) {
+ this.outputDivisor = 60000.0;
+ }
+ useInputFrequencyForOutputFrequency =
ON_INPUT_EVENT.equals(outputFrequency);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ String value =
event.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
+ long currentTimeStamp = System.currentTimeMillis();
+
+ if (Objects.isNull(this.fieldValueOfLastEvent)) {
+ this.timestamp = currentTimeStamp;
+ } else {
+ // Define if result event should be emitted or not
+ if (this.useInputFrequencyForOutputFrequency ||
!this.fieldValueOfLastEvent.equals(value)) {
+ long diff = currentTimeStamp - this.timestamp;
+ double result = diff / this.outputDivisor;
+
+ event.addField(MEASURED_TIME_FIELD_RUNTIME_NAME, result);
+ event.addField(FIELD_VALUE_RUNTIME_NAME, this.fieldValueOfLastEvent);
+ collector.collect(event);
+ }
+
+ // if state changes reset timestamp
+ if (!this.fieldValueOfLastEvent.equals(value)) {
+ timestamp = currentTimeStamp;
+ }
+ }
+ this.fieldValueOfLastEvent = value;
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/TestStringTimerProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/TestStringTimerProcessor.java
new file mode 100644
index 000000000..7127e2c57
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/TestStringTimerProcessor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.runtime.SchemaInfo;
+import org.apache.streampipes.model.runtime.SourceInfo;
+import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.test.generator.EventStreamGenerator;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestStringTimerProcessor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestStringTimerProcessor.class);
+
+ @org.junit.runners.Parameterized.Parameters
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"select_field", "On Input Event", "Milliseconds", List.of(""), ""},
+ {"select_field", "On Input Event", "Milliseconds", List.of("t1"), ""},
+ {"select_field", "On Input Event", "Milliseconds", List.of("t1",
"t1"), "t1"},
+ {"select_field", "On Input Event", "Seconds", List.of("t1", "t1",
"t2"), "t1"},
+ {"select_field", "On Input Event", "Minutes", List.of("t1", "t2",
"t3"), "t2"},
+ {"select_field", "When String Value Changes", "Milliseconds",
List.of(""), ""},
+ {"select_field", "When String Value Changes", "Milliseconds",
List.of("t1"), ""},
+ {"select_field", "When String Value Changes", "Milliseconds",
List.of("t1", "t2"), "t1"},
+ {"select_field", "When String Value Changes", "Seconds", List.of("t1",
"t1", "t2"), "t1"},
+ {"select_field", "When String Value Changes", "Minutes", List.of("t1",
"t2", "t3"), "t2"}
+ });
+ }
+
+ @org.junit.runners.Parameterized.Parameter
+ public String selectedFieldName;
+
+ @org.junit.runners.Parameterized.Parameter(1)
+ public String outputFrequency;
+
+ @org.junit.runners.Parameterized.Parameter(2)
+ public String outputUnit;
+
+ @org.junit.runners.Parameterized.Parameter(3)
+ public List<String> eventStrings;
+
+ @org.junit.runners.Parameterized.Parameter(4)
+ public String expectedValue;
+
+ public static final String DEFAULT_STREAM_PREFIX = "stream";
+
+ @Test
+ public void testStringTimer() {
+ StringTimerProcessor stringTimer = new StringTimerProcessor();
+ DataProcessorDescription originalGraph = stringTimer.declareModel();
+
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
+
+ DataProcessorInvocation graph =
InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
+ graph.setInputStreams(Collections.singletonList(
+
EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream"))));
+ graph.setOutputStream(
+
EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")));
+
graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
+ .setActualTopicName("output-topic");
+
+ MappingPropertyUnary mappingPropertyUnary =
graph.getStaticProperties().stream()
+ .filter(p -> p instanceof MappingPropertyUnary)
+ .map(p -> (MappingPropertyUnary) p)
+ .filter(p ->
(StringTimerProcessor.FIELD_ID).equals(p.getInternalName()))
+ .findFirst().orElse(null);
+
+ assert mappingPropertyUnary != null;
+ mappingPropertyUnary.setSelectedProperty(DEFAULT_STREAM_PREFIX + "::" +
selectedFieldName);
+
+ OneOfStaticProperty outputFrequencyStaticProperty =
graph.getStaticProperties().stream()
+ .filter(p -> p instanceof OneOfStaticProperty)
+ .map(p -> (OneOfStaticProperty) p)
+ .filter(p ->
(StringTimerProcessor.OUTPUT_FREQUENCY.equals(p.getInternalName())))
+ .findFirst().orElse(null);
+ assert outputFrequencyStaticProperty != null;
+ Option outputFrequencyOption =
outputFrequencyStaticProperty.getOptions().stream()
+ .filter(item -> item.getName().equals(outputFrequency))
+ .findFirst().orElse(null);
+ assert outputFrequencyOption != null;
+ outputFrequencyOption.setSelected(true);
+
+ OneOfStaticProperty outputUnitStaticProperty =
graph.getStaticProperties().stream()
+ .filter(p -> p instanceof OneOfStaticProperty)
+ .map(p -> (OneOfStaticProperty) p)
+ .filter(p ->
(StringTimerProcessor.OUTPUT_UNIT_ID.equals(p.getInternalName())))
+ .findFirst().orElse(null);
+ assert outputUnitStaticProperty != null;
+ Option outputUnitOption = outputUnitStaticProperty.getOptions().stream()
+ .filter(item -> item.getName().equals(outputUnit))
+ .findFirst().orElse(null);
+ assert outputUnitOption != null;
+ outputUnitOption.setSelected(true);
+
+ ProcessorParams params = new ProcessorParams(graph);
+
+ SpOutputCollector spOut = new SpOutputCollector() {
+ @Override
+ public void registerConsumer(String routeId,
InternalEventProcessor<Map<String, Object>> consumer) {
+ }
+
+ @Override
+ public void unregisterConsumer(String routeId) {
+ }
+
+ @Override
+ public void connect() throws SpRuntimeException {
+ }
+
+ @Override
+ public void disconnect() throws SpRuntimeException {
+ }
+
+ @Override
+ public void collect(Event event) {
+ }
+ };
+ stringTimer.onInvocation(params, spOut, null);
+ Tuple2<String, Double> res = sendEvents(stringTimer, spOut);
+ LOG.info("Expected value is {}.", expectedValue);
+ LOG.info("Actual value is {}.", res.k);
+ assertEquals(expectedValue, res.k);
+ }
+
+ private Tuple2<String, Double> sendEvents(StringTimerProcessor stringTimer,
SpOutputCollector spOut) {
+ String field = "";
+ double timeDiff = 0.0;
+ List<Event> events = makeEvents();
+ for (Event event : events) {
+ LOG.info("Sending event with value "
+ + event.getFieldBySelector(DEFAULT_STREAM_PREFIX + "::" +
selectedFieldName).getAsPrimitive().getAsString());
+ stringTimer.onEvent(event, spOut);
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ field =
event.getFieldBySelector(StringTimerProcessor.FIELD_VALUE_RUNTIME_NAME)
+ .getAsPrimitive()
+ .getAsString();
+ timeDiff =
event.getFieldBySelector(StringTimerProcessor.MEASURED_TIME_FIELD_RUNTIME_NAME)
+ .getAsPrimitive()
+ .getAsDouble();
+ LOG.info(field + " time: " + timeDiff);
+ } catch (IllegalArgumentException e) {
+
+ }
+ }
+ return new Tuple2<>(field, timeDiff);
+ }
+
+ private List<Event> makeEvents() {
+ List<Event> events = Lists.newArrayList();
+ for (String eventString : eventStrings) {
+ events.add(makeEvent(eventString));
+ }
+ return events;
+ }
+ private Event makeEvent(String value) {
+ Map<String, Object> map = Maps.newHashMap();
+ map.put(selectedFieldName, value);
+ return EventFactory.fromMap(map,
+ new SourceInfo("test-topic", DEFAULT_STREAM_PREFIX),
+ new SchemaInfo(null, Lists.newArrayList())
+ );
+ }
+}