This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new c7c1a301b Migrate StringCounter processing elements (#1427)
c7c1a301b is described below

commit c7c1a301b2611d9e6bbdd18f1f448d575276e6d5
Author: Liu Xiao <[email protected]>
AuthorDate: Tue Mar 21 02:27:51 2023 +0800

    Migrate StringCounter processing elements (#1427)
---
 .../transformation/jvm/TransformationJvmInit.java  |   4 +-
 .../stringoperator/counter/StringCounter.java      |  79 --------
 .../counter/StringCounterParameters.java           |  36 ----
 ...Controller.java => StringCounterProcessor.java} | 201 ++++++++++++---------
 .../counter/TestStringCounterProcessor.java        | 182 +++++++++++++++++++
 5 files changed, 304 insertions(+), 198 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index b060d0858..1c432be95 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -42,7 +42,7 @@ import 
org.apache.streampipes.processors.transformation.jvm.processor.hasher.Fie
 import 
org.apache.streampipes.processors.transformation.jvm.processor.mapper.FieldMapperProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.measurementconverter.MeasurementUnitConverterProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
-import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
+import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state.StringToStateController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
@@ -85,7 +85,7 @@ public class TransformationJvmInit extends 
ExtensionsModelSubmitter {
             new BooleanToStateController(),
             new NumberLabelerController(),
             new StringToStateController(),
-            new StringCounterController(),
+            new StringCounterProcessor(),
             new BooleanOperatorProcessor(),
             new FiledRenameProcessor())
         .registerMessagingFormats(
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java
deleted file mode 100644
index d6b3ad8d4..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.HashMap;
-
-public class StringCounter implements EventProcessor<StringCounterParameters> {
-
-  private static Logger log;
-
-  private String selectedFieldName;
-  private String fieldValueOfLastEvent;
-
-  private HashMap<String, Integer> changeCounter;
-
-  @Override
-  public void onInvocation(StringCounterParameters 
stringCounterParametersParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) {
-    log = 
stringCounterParametersParameters.getGraph().getLogger(StringCounter.class);
-    this.selectedFieldName = 
stringCounterParametersParameters.getSelectedFieldName();
-    this.fieldValueOfLastEvent = "";
-    this.changeCounter = new HashMap<>();
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-
-    String value = 
inputEvent.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
-    String key = this.fieldValueOfLastEvent + ">" + value;
-    boolean updateCounter = false;
-
-    if (!this.fieldValueOfLastEvent.equals(value) && 
!this.fieldValueOfLastEvent.isEmpty()) {
-      updateCounter = true;
-
-      if (changeCounter.containsKey(key)) {
-        changeCounter.put(key, changeCounter.get(key) + 1);
-      } else {
-        changeCounter.put(key, 1);
-      }
-    }
-
-    if (updateCounter) {
-      
inputEvent.addField(StringCounterController.CHANGE_FROM_FIELD_RUNTIME_NAME, 
this.fieldValueOfLastEvent);
-      
inputEvent.addField(StringCounterController.CHANGE_TO_FIELD_RUNTIME_NAME, 
value);
-      inputEvent.addField(StringCounterController.COUNT_FIELD_RUNTIME_NAME, 
changeCounter.get(key));
-      out.collect(inputEvent);
-    }
-
-    this.fieldValueOfLastEvent = value;
-  }
-
-  @Override
-  public void onDetach() {
-  }
-
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java
deleted file mode 100644
index 8ffa9f169..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class StringCounterParameters extends EventProcessorBindingParams {
-  private String selectedFieldName;
-
-  public StringCounterParameters(DataProcessorInvocation graph, String 
selectedFieldName) {
-    super(graph);
-    this.selectedFieldName = selectedFieldName;
-  }
-
-  public String getSelectedFieldName() {
-    return selectedFieldName;
-  }
-
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterProcessor.java
similarity index 57%
rename from 
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java
rename to 
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterProcessor.java
index cdc21a676..c8bb27c14 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterProcessor.java
@@ -1,81 +1,120 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class StringCounterController extends 
StandaloneEventProcessingDeclarer<StringCounterParameters> {
-
-  private static final String FIELD_ID = "field";
-  private static final String COUNT_FIELD_ID = "countField";
-  private static final String CHANGE_FROM_FIELD_ID = "changeFromField";
-  private static final String CHANGE_TO_FIELD_ID = "changeToField";
-
-  public static final String COUNT_FIELD_RUNTIME_NAME = "counter";
-  public static final String CHANGE_FROM_FIELD_RUNTIME_NAME = "change_from";
-  public static final String CHANGE_TO_FIELD_RUNTIME_NAME = "change_to";
-
-
-  @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create(
-            
"org.apache.streampipes.processors.transformation.jvm.stringoperator.counter")
-        .category(DataProcessorType.STRING_OPERATOR, 
DataProcessorType.COUNT_OPERATOR)
-        .withLocales(Locales.EN)
-        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(
-                EpRequirements.stringReq(),
-                Labels.withId(FIELD_ID),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.stringEp(Labels.withId(CHANGE_FROM_FIELD_ID), 
CHANGE_FROM_FIELD_RUNTIME_NAME,
-                "http://schema.org/String";),
-            EpProperties.stringEp(Labels.withId(CHANGE_TO_FIELD_ID), 
CHANGE_TO_FIELD_RUNTIME_NAME,
-                "http://schema.org/String";),
-            EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID), 
COUNT_FIELD_RUNTIME_NAME, "http://schema.org/Number";)
-        ))
-        .build();
-  }
-
-  @Override
-  public ConfiguredEventProcessor<StringCounterParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                        
ProcessingElementParameterExtractor extractor) {
-
-    String selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
-
-    StringCounterParameters params = new StringCounterParameters(graph, 
selectedFieldName);
-    return new ConfiguredEventProcessor<>(params, StringCounter::new);
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.HashMap;
+
+public class StringCounterProcessor extends StreamPipesDataProcessor {
+
+  private static Logger log;
+
+  protected static final String FIELD_ID = "field";
+  private static final String COUNT_FIELD_ID = "countField";
+  private static final String CHANGE_FROM_FIELD_ID = "changeFromField";
+  private static final String CHANGE_TO_FIELD_ID = "changeToField";
+
+  public static final String COUNT_FIELD_RUNTIME_NAME = "counter";
+  public static final String CHANGE_FROM_FIELD_RUNTIME_NAME = "change_from";
+  public static final String CHANGE_TO_FIELD_RUNTIME_NAME = "change_to";
+
+  public String selectedFieldName;
+  private String fieldValueOfLastEvent;
+
+  private HashMap<String, Integer> changeCounter;
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create(
+            
"org.apache.streampipes.processors.transformation.jvm.stringoperator.counter")
+        .category(DataProcessorType.STRING_OPERATOR, 
DataProcessorType.COUNT_OPERATOR)
+        .withLocales(Locales.EN)
+        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .requiredStream(StreamRequirementsBuilder.create()
+            .requiredPropertyWithUnaryMapping(
+                EpRequirements.stringReq(),
+                Labels.withId(FIELD_ID),
+                PropertyScope.NONE)
+            .build())
+        .outputStrategy(OutputStrategies.append(
+            EpProperties.stringEp(Labels.withId(CHANGE_FROM_FIELD_ID), 
CHANGE_FROM_FIELD_RUNTIME_NAME,
+                "http://schema.org/String";),
+            EpProperties.stringEp(Labels.withId(CHANGE_TO_FIELD_ID), 
CHANGE_TO_FIELD_RUNTIME_NAME,
+                "http://schema.org/String";),
+            EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID), 
COUNT_FIELD_RUNTIME_NAME, "http://schema.org/Number";)
+        ))
+        .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    log = parameters.getGraph().getLogger(StringCounterProcessor.class);
+    ProcessingElementParameterExtractor extractor = parameters.extractor();
+    this.selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
+    this.fieldValueOfLastEvent = "";
+    this.changeCounter = new HashMap<>();
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    String value = 
event.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
+    String key = this.fieldValueOfLastEvent + ">" + value;
+    boolean updateCounter = false;
+
+    if (!this.fieldValueOfLastEvent.equals(value) && 
!this.fieldValueOfLastEvent.isEmpty()) {
+      updateCounter = true;
+      changeCounter.put(key, changeCounter.getOrDefault(key, 0) + 1);
+    }
+
+    if (updateCounter) {
+      event.addField(StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME, 
this.fieldValueOfLastEvent);
+      event.addField(StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME, 
value);
+      event.addField(StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME, 
changeCounter.get(key));
+      collector.collect(event);
+    }
+
+    this.fieldValueOfLastEvent = value;
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
new file mode 100644
index 000000000..3835c0ae8
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.runtime.SchemaInfo;
+import org.apache.streampipes.model.runtime.SourceInfo;
+import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.sdk.helpers.Tuple3;
+import org.apache.streampipes.test.generator.EventStreamGenerator;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+
+@RunWith(Parameterized.class)
+public class TestStringCounterProcessor {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestStringCounterProcessor.class);
+
+  @org.junit.runners.Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {"Test", List.of("t1"), new Tuple3<>("", "", 0)},
+        {"Test", Arrays.asList("t1", "t2"), new Tuple3<>("t1", "t2", 1)},
+        {"Test", Arrays.asList("t1", "t2", "t1", "t2"), new Tuple3<>("t1", 
"t2", 2)},
+        {"Test", Arrays.asList("t1", "t2", "t1", "t3"), new Tuple3<>("t1", 
"t3", 1)}
+    });
+  }
+
+  @org.junit.runners.Parameterized.Parameter
+  public String selectedFieldName;
+
+  @org.junit.runners.Parameterized.Parameter(1)
+  public List<String> eventStrings;
+
+  @org.junit.runners.Parameterized.Parameter(2)
+  public Tuple3<String, String, Integer> expectedValue;
+
+  @Test
+  public void testStringCounter() {
+    StringCounterProcessor stringCounter = new StringCounterProcessor();
+    DataProcessorDescription originalGraph = stringCounter.declareModel();
+    
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
+
+    DataProcessorInvocation graph = 
InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
+    graph.setInputStreams(Collections
+        .singletonList(EventStreamGenerator
+            
.makeStreamWithProperties(Collections.singletonList(selectedFieldName))));
+    
graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList(selectedFieldName)));
+    
graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
+        .setActualTopicName("output-topic");
+
+    MappingPropertyUnary mappingPropertyUnary = 
graph.getStaticProperties().stream()
+        .filter(p -> p instanceof MappingPropertyUnary)
+        .map((p -> (MappingPropertyUnary) p))
+        .filter(p -> 
p.getInternalName().equals(StringCounterProcessor.FIELD_ID))
+        .findFirst().orElse(null);
+    assert mappingPropertyUnary != null;
+    mappingPropertyUnary.setSelectedProperty("s0::" + selectedFieldName);
+    ProcessorParams params = new ProcessorParams(graph);
+
+    SpOutputCollector spOut = new SpOutputCollector() {
+      @Override
+      public void registerConsumer(String routeId, 
InternalEventProcessor<Map<String, Object>> consumer) {
+      }
+
+      @Override
+      public void unregisterConsumer(String routeId) {
+      }
+
+      @Override
+      public void connect() throws SpRuntimeException {
+      }
+
+      @Override
+      public void disconnect() throws SpRuntimeException {
+      }
+
+      @Override
+      public void collect(Event event) {
+      }
+    };
+
+    stringCounter.onInvocation(params, spOut, null);
+    Tuple3<String, String, Integer> tuple = sendEvents(stringCounter, spOut);
+    LOG.info("Expected match count is {}.", expectedValue.x);
+    LOG.info("Actual match count is {}.", tuple.x);
+    assertEquals(expectedValue.x, tuple.x);
+    LOG.info("Expected change from is {}.", expectedValue.k);
+    LOG.info("Actual change from is {}.", tuple.k);
+    assertEquals(expectedValue.k, tuple.k);
+    LOG.info("Expected change to is {}.", expectedValue.k);
+    LOG.info("Actual change to is {}.", tuple.k);
+    assertEquals(expectedValue.v, tuple.v);
+  }
+
+  private Tuple3<String, String, Integer> sendEvents(StringCounterProcessor 
stringCounter, SpOutputCollector spOut) {
+    int counter = 0;
+    String changeFrom = "", changeTo = "";
+    List<Event> events = makeEvents();
+    for (Event event : events) {
+      LOG.info("Sending event with value "
+          + event.getFieldBySelector("s0::" + 
selectedFieldName).getAsPrimitive().getAsString());
+      stringCounter.onEvent(event, spOut);
+      try {
+        TimeUnit.MILLISECONDS.sleep(100);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      try {
+        counter = 
event.getFieldBySelector(StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME)
+            .getAsPrimitive()
+            .getAsInt();
+        changeFrom = 
event.getFieldBySelector(StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME)
+            .getAsPrimitive()
+            .getAsString();
+        changeTo = 
event.getFieldBySelector(StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME)
+            .getAsPrimitive()
+            .getAsString();
+        LOG.info(changeFrom + " change to " + changeTo + ", value = " + 
counter);
+      } catch (IllegalArgumentException e) {
+
+      }
+    }
+    return new Tuple3<>(changeFrom, changeTo, counter);
+  }
+
+
+  private List<Event> makeEvents() {
+    List<Event> events = Lists.newArrayList();
+    for (String eventSetting : eventStrings) {
+      events.add(makeEvent(eventSetting));
+    }
+    return events;
+  }
+
+  private Event makeEvent(String value) {
+    Map<String, Object> map = Maps.newHashMap();
+    map.put(selectedFieldName, value);
+    return EventFactory.fromMap(map,
+        new SourceInfo("test" + "-topic", "s0"),
+        new SchemaInfo(null, Lists.newArrayList()));
+  }
+}
\ No newline at end of file

Reply via email to