This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new a99f0cd44 Migrate StringTimer processing elements (#1453)
a99f0cd44 is described below

commit a99f0cd44da02b1aa08f40dd2934d4defd77f76f
Author: Liu Xiao <[email protected]>
AuthorDate: Mon Mar 27 15:57:11 2023 +0800

    Migrate StringTimer processing elements (#1453)
    
    * Migrate StringTimer processing elements
    
    * add more test
---
 .../transformation/jvm/TransformationJvmInit.java  |   4 +-
 .../stringoperator/timer/StringTimer.java          |  79 -------
 .../timer/StringTimerParameters.java               |  48 ----
 ...erController.java => StringTimerProcessor.java} | 252 ++++++++++++---------
 .../timer/TestStringTimerProcessor.java            | 213 +++++++++++++++++
 5 files changed, 362 insertions(+), 234 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 885379999..14c357eff 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -44,7 +44,7 @@ import 
org.apache.streampipes.processors.transformation.jvm.processor.measuremen
 import 
org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state.StringToStateProcessor;
-import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
+import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.timestampextractor.TimestampExtractorController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.transformtoboolean.TransformToBooleanController;
@@ -80,7 +80,7 @@ public class TransformationJvmInit extends 
ExtensionsModelSubmitter {
             new MeasurementUnitConverterProcessor(),
             new TaskDurationController(),
             new TransformToBooleanController(),
-            new StringTimerController(),
+            new StringTimerProcessor(),
             new SignalEdgeFilterController(),
             new BooleanToStateController(),
             new NumberLabelerController(),
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java
deleted file mode 100644
index 00896360d..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimer.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-public class StringTimer implements EventProcessor<StringTimerParameters> {
-
-  private static Logger log;
-
-  private String selectedFieldName;
-  private Long timestamp;
-  private double outputDivisor;
-  private String fieldValueOfLastEvent;
-  private boolean useInputFrequencyForOutputFrequency;
-
-  @Override
-  public void onInvocation(StringTimerParameters stringTimerParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) {
-    log = stringTimerParameters.getGraph().getLogger(StringTimer.class);
-    this.selectedFieldName = stringTimerParameters.getSelectedFieldName();
-    this.outputDivisor = stringTimerParameters.getOutputDivisor();
-    this.useInputFrequencyForOutputFrequency = 
stringTimerParameters.isUseInputFrequencyForOutputFrequency();
-
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-
-    String value = 
inputEvent.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
-    Long currentTime = System.currentTimeMillis();
-
-    if (this.fieldValueOfLastEvent == null) {
-      this.timestamp = currentTime;
-    } else {
-      // Define if result event should be emitted or not
-      if (this.useInputFrequencyForOutputFrequency || 
!this.fieldValueOfLastEvent.equals(value)) {
-        Long difference = currentTime - this.timestamp;
-        double result = difference / this.outputDivisor;
-
-        
inputEvent.addField(StringTimerController.MEASURED_TIME_FIELD_RUNTIME_NAME, 
result);
-        inputEvent.addField(StringTimerController.FIELD_VALUE_RUNTIME_NAME, 
this.fieldValueOfLastEvent);
-        out.collect(inputEvent);
-      }
-
-      // if state changes reset timestamp
-      if (!this.fieldValueOfLastEvent.equals(value)) {
-        timestamp = currentTime;
-      }
-
-    }
-    this.fieldValueOfLastEvent = value;
-  }
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java
deleted file mode 100644
index d9e0cb4f3..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerParameters.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class StringTimerParameters extends EventProcessorBindingParams {
-  private String fieldName;
-  private double outputDivisor;
-  private boolean useInputFrequencyForOutputFrequency;
-
-  public StringTimerParameters(DataProcessorInvocation graph, String 
fieldName, double outputDivisor,
-                               boolean useInputFrequencyForOutputFrequency) {
-    super(graph);
-    this.fieldName = fieldName;
-    this.outputDivisor = outputDivisor;
-    this.useInputFrequencyForOutputFrequency = 
useInputFrequencyForOutputFrequency;
-  }
-
-  public String getSelectedFieldName() {
-    return fieldName;
-  }
-
-  public double getOutputDivisor() {
-    return outputDivisor;
-  }
-
-  public boolean isUseInputFrequencyForOutputFrequency() {
-    return useInputFrequencyForOutputFrequency;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerProcessor.java
similarity index 55%
rename from 
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java
rename to 
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerProcessor.java
index 5012586dd..32fb42bbf 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerController.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/StringTimerProcessor.java
@@ -1,105 +1,147 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class StringTimerController extends 
StandaloneEventProcessingDeclarer<StringTimerParameters> {
-
-  public static final String FIELD_ID = "field";
-  public static final String MEASURED_TIME_ID = "measuredTime";
-  public static final String FIELD_VALUE_ID = "fieldValue";
-
-  public static final String OUTPUT_UNIT_ID = "outputUnit";
-  private static final String MILLISECONDS = "Milliseconds";
-  private static final String SECONDS = "Seconds";
-  private static final String MINUTES = "Minutes";
-
-  public static final String OUTPUT_FREQUENCY = "outputFrequency";
-  private static final String ON_INPUT_EVENT = "On Input Event";
-  private static final String ON_STRING_VALUE_CHANGE = "When String Value 
Changes";
-
-  public static final String MEASURED_TIME_FIELD_RUNTIME_NAME = 
"measured_time";
-  public static final String FIELD_VALUE_RUNTIME_NAME = "field_value";
-
-
-  @Override
-  public DataProcessorDescription declareModel() {
-    return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.stringoperator.timer")
-        .category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME)
-        .withLocales(Locales.EN)
-        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.stringReq(),
-                Labels.withId(FIELD_ID),
-                PropertyScope.NONE)
-            .build())
-        .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID), 
Options.from(MILLISECONDS, SECONDS, MINUTES))
-        .requiredSingleValueSelection(Labels.withId(OUTPUT_FREQUENCY),
-            Options.from(ON_INPUT_EVENT, ON_STRING_VALUE_CHANGE))
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID), 
MEASURED_TIME_FIELD_RUNTIME_NAME,
-                "http://schema.org/Number";),
-            EpProperties.stringEp(Labels.withId(FIELD_VALUE_ID), 
FIELD_VALUE_RUNTIME_NAME, "http://schema.org/String";)
-        ))
-        .build();
-  }
-
-  @Override
-  public ConfiguredEventProcessor<StringTimerParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                      
ProcessingElementParameterExtractor extractor) {
-
-    String selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
-    String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID, 
String.class);
-    String outputFrequency = extractor.selectedSingleValue(OUTPUT_FREQUENCY, 
String.class);
-
-    double outputDivisor = 1.0;
-    if (outputUnit.equals(SECONDS)) {
-      outputDivisor = 1000.0;
-    } else if (outputUnit.equals(MINUTES)) {
-      outputDivisor = 60000.0;
-    }
-
-    boolean useInputFrequencyForOutputFrequency = false;
-    if (ON_INPUT_EVENT.equals(outputFrequency)) {
-      useInputFrequencyForOutputFrequency = true;
-    }
-
-    StringTimerParameters params =
-        new StringTimerParameters(graph, selectedFieldName, outputDivisor, 
useInputFrequencyForOutputFrequency);
-
-    return new ConfiguredEventProcessor<>(params, StringTimer::new);
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.Objects;
+
+public class StringTimerProcessor extends StreamPipesDataProcessor {
+
+  private static Logger log;
+
+  public static final String FIELD_ID = "field";
+  public static final String MEASURED_TIME_ID = "measuredTime";
+  public static final String FIELD_VALUE_ID = "fieldValue";
+
+  public static final String OUTPUT_UNIT_ID = "outputUnit";
+  private static final String MILLISECONDS = "Milliseconds";
+  private static final String SECONDS = "Seconds";
+  private static final String MINUTES = "Minutes";
+
+  public static final String OUTPUT_FREQUENCY = "outputFrequency";
+  private static final String ON_INPUT_EVENT = "On Input Event";
+  private static final String ON_STRING_VALUE_CHANGE = "When String Value 
Changes";
+
+  public static final String MEASURED_TIME_FIELD_RUNTIME_NAME = 
"measured_time";
+  public static final String FIELD_VALUE_RUNTIME_NAME = "field_value";
+
+  private String selectedFieldName;
+  private long timestamp;
+
+  private double outputDivisor;
+  private boolean useInputFrequencyForOutputFrequency;
+
+  private String fieldValueOfLastEvent;
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.stringoperator.timer")
+        .category(DataProcessorType.STRING_OPERATOR, DataProcessorType.TIME)
+        .withLocales(Locales.EN)
+        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .requiredStream(StreamRequirementsBuilder.create()
+            .requiredPropertyWithUnaryMapping(
+                EpRequirements.stringReq(),
+                Labels.withId(FIELD_ID),
+                PropertyScope.NONE)
+            .build())
+        .requiredSingleValueSelection(Labels.withId(OUTPUT_UNIT_ID),
+            Options.from(MILLISECONDS, SECONDS, MINUTES))
+        .requiredSingleValueSelection(Labels.withId(OUTPUT_FREQUENCY),
+            Options.from(ON_INPUT_EVENT, ON_STRING_VALUE_CHANGE))
+        .outputStrategy(OutputStrategies.append(
+            EpProperties.numberEp(Labels.withId(MEASURED_TIME_ID),
+                MEASURED_TIME_FIELD_RUNTIME_NAME, "http://schema.org/Number";),
+            EpProperties.stringEp(Labels.withId(FIELD_VALUE_ID),
+                FIELD_VALUE_RUNTIME_NAME, "http://schema.org/String";)
+        ))
+        .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    log = parameters.getGraph().getLogger(StringTimerProcessor.class);
+    ProcessingElementParameterExtractor extractor = parameters.extractor();
+
+    this.selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
+    String outputUnit = extractor.selectedSingleValue(OUTPUT_UNIT_ID, 
String.class);
+    String outputFrequency = extractor.selectedSingleValue(OUTPUT_FREQUENCY, 
String.class);
+
+    this.outputDivisor = 1.0;
+    if (SECONDS.equals(outputUnit)) {
+      this.outputDivisor = 1000.0;
+    } else if (MINUTES.equals(outputUnit)) {
+      this.outputDivisor = 60000.0;
+    }
+    useInputFrequencyForOutputFrequency = 
ON_INPUT_EVENT.equals(outputFrequency);
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    String value = 
event.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
+    long currentTimeStamp = System.currentTimeMillis();
+
+    if (Objects.isNull(this.fieldValueOfLastEvent)) {
+      this.timestamp = currentTimeStamp;
+    } else {
+      // Define if result event should be emitted or not
+      if (this.useInputFrequencyForOutputFrequency || 
!this.fieldValueOfLastEvent.equals(value)) {
+        long diff = currentTimeStamp - this.timestamp;
+        double result = diff / this.outputDivisor;
+
+        event.addField(MEASURED_TIME_FIELD_RUNTIME_NAME, result);
+        event.addField(FIELD_VALUE_RUNTIME_NAME, this.fieldValueOfLastEvent);
+        collector.collect(event);
+      }
+
+      // if state changes reset timestamp
+      if (!this.fieldValueOfLastEvent.equals(value)) {
+        timestamp = currentTimeStamp;
+      }
+    }
+    this.fieldValueOfLastEvent = value;
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/TestStringTimerProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/TestStringTimerProcessor.java
new file mode 100644
index 000000000..7127e2c57
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/timer/TestStringTimerProcessor.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.runtime.SchemaInfo;
+import org.apache.streampipes.model.runtime.SourceInfo;
+import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.sdk.helpers.Tuple2;
+import org.apache.streampipes.test.generator.EventStreamGenerator;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestStringTimerProcessor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestStringTimerProcessor.class);
+
+  @org.junit.runners.Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {"select_field", "On Input Event", "Milliseconds", List.of(""), ""},
+        {"select_field", "On Input Event", "Milliseconds", List.of("t1"), ""},
+        {"select_field", "On Input Event", "Milliseconds", List.of("t1", 
"t1"), "t1"},
+        {"select_field", "On Input Event", "Seconds", List.of("t1", "t1", 
"t2"), "t1"},
+        {"select_field", "On Input Event", "Minutes", List.of("t1", "t2", 
"t3"), "t2"},
+        {"select_field", "When String Value Changes", "Milliseconds", 
List.of(""), ""},
+        {"select_field", "When String Value Changes", "Milliseconds", 
List.of("t1"), ""},
+        {"select_field", "When String Value Changes", "Milliseconds", 
List.of("t1", "t2"), "t1"},
+        {"select_field", "When String Value Changes", "Seconds", List.of("t1", 
"t1", "t2"), "t1"},
+        {"select_field", "When String Value Changes", "Minutes", List.of("t1", 
"t2", "t3"), "t2"}
+    });
+  }
+
+  @org.junit.runners.Parameterized.Parameter
+  public String selectedFieldName;
+
+  @org.junit.runners.Parameterized.Parameter(1)
+  public String outputFrequency;
+
+  @org.junit.runners.Parameterized.Parameter(2)
+  public String outputUnit;
+
+  @org.junit.runners.Parameterized.Parameter(3)
+  public List<String> eventStrings;
+
+  @org.junit.runners.Parameterized.Parameter(4)
+  public String expectedValue;
+
+  public static final String DEFAULT_STREAM_PREFIX = "stream";
+
+  @Test
+  public void testStringTimer() {
+    StringTimerProcessor stringTimer = new StringTimerProcessor();
+    DataProcessorDescription originalGraph = stringTimer.declareModel();
+    
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
+
+    DataProcessorInvocation graph = 
InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
+    graph.setInputStreams(Collections.singletonList(
+        
EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("in-stream"))));
+    graph.setOutputStream(
+        
EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("out-stream")));
+    
graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
+        .setActualTopicName("output-topic");
+
+    MappingPropertyUnary mappingPropertyUnary = 
graph.getStaticProperties().stream()
+        .filter(p -> p instanceof MappingPropertyUnary)
+        .map(p -> (MappingPropertyUnary) p)
+        .filter(p -> 
(StringTimerProcessor.FIELD_ID).equals(p.getInternalName()))
+        .findFirst().orElse(null);
+
+    assert mappingPropertyUnary != null;
+    mappingPropertyUnary.setSelectedProperty(DEFAULT_STREAM_PREFIX + "::" + 
selectedFieldName);
+
+    OneOfStaticProperty outputFrequencyStaticProperty = 
graph.getStaticProperties().stream()
+        .filter(p -> p instanceof OneOfStaticProperty)
+        .map(p -> (OneOfStaticProperty) p)
+        .filter(p -> 
(StringTimerProcessor.OUTPUT_FREQUENCY.equals(p.getInternalName())))
+        .findFirst().orElse(null);
+    assert outputFrequencyStaticProperty != null;
+    Option outputFrequencyOption = 
outputFrequencyStaticProperty.getOptions().stream()
+        .filter(item -> item.getName().equals(outputFrequency))
+        .findFirst().orElse(null);
+    assert  outputFrequencyOption != null;
+    outputFrequencyOption.setSelected(true);
+
+    OneOfStaticProperty outputUnitStaticProperty = 
graph.getStaticProperties().stream()
+        .filter(p -> p instanceof OneOfStaticProperty)
+        .map(p -> (OneOfStaticProperty) p)
+        .filter(p -> 
(StringTimerProcessor.OUTPUT_UNIT_ID.equals(p.getInternalName())))
+        .findFirst().orElse(null);
+    assert  outputUnitStaticProperty != null;
+    Option outputUnitOption = outputUnitStaticProperty.getOptions().stream()
+        .filter(item -> item.getName().equals(outputUnit))
+        .findFirst().orElse(null);
+    assert outputUnitOption != null;
+    outputUnitOption.setSelected(true);
+
+    ProcessorParams params = new ProcessorParams(graph);
+
+    SpOutputCollector spOut = new SpOutputCollector() {
+      @Override
+      public void registerConsumer(String routeId, 
InternalEventProcessor<Map<String, Object>> consumer) {
+      }
+
+      @Override
+      public void unregisterConsumer(String routeId) {
+      }
+
+      @Override
+      public void connect() throws SpRuntimeException {
+      }
+
+      @Override
+      public void disconnect() throws SpRuntimeException {
+      }
+
+      @Override
+      public void collect(Event event) {
+      }
+    };
+    stringTimer.onInvocation(params, spOut, null);
+    Tuple2<String, Double> res = sendEvents(stringTimer, spOut);
+    LOG.info("Expected value is {}.", expectedValue);
+    LOG.info("Actual value is {}.", res.k);
+    assertEquals(expectedValue, res.k);
+  }
+
+  private Tuple2<String, Double> sendEvents(StringTimerProcessor stringTimer, 
SpOutputCollector spOut) {
+    String field = "";
+    double timeDiff = 0.0;
+    List<Event> events = makeEvents();
+    for (Event event : events) {
+      LOG.info("Sending event with value "
+          + event.getFieldBySelector(DEFAULT_STREAM_PREFIX + "::" + 
selectedFieldName).getAsPrimitive().getAsString());
+      stringTimer.onEvent(event, spOut);
+      try {
+        TimeUnit.MILLISECONDS.sleep(100);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      try {
+        field = 
event.getFieldBySelector(StringTimerProcessor.FIELD_VALUE_RUNTIME_NAME)
+            .getAsPrimitive()
+            .getAsString();
+        timeDiff = 
event.getFieldBySelector(StringTimerProcessor.MEASURED_TIME_FIELD_RUNTIME_NAME)
+            .getAsPrimitive()
+                .getAsDouble();
+        LOG.info(field + " time: " + timeDiff);
+      } catch (IllegalArgumentException e) {
+
+      }
+    }
+    return new Tuple2<>(field, timeDiff);
+  }
+
+  private List<Event> makeEvents() {
+    List<Event> events = Lists.newArrayList();
+    for (String eventString : eventStrings) {
+      events.add(makeEvent(eventString));
+    }
+    return events;
+  }
+  private Event makeEvent(String value) {
+    Map<String, Object> map = Maps.newHashMap();
+    map.put(selectedFieldName, value);
+    return EventFactory.fromMap(map,
+        new SourceInfo("test-topic", DEFAULT_STREAM_PREFIX),
+        new SchemaInfo(null, Lists.newArrayList())
+    );
+  }
+}

Reply via email to