This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new c7c1a301b Migrate StringCounter processing elements (#1427)
c7c1a301b is described below
commit c7c1a301b2611d9e6bbdd18f1f448d575276e6d5
Author: Liu Xiao <[email protected]>
AuthorDate: Tue Mar 21 02:27:51 2023 +0800
Migrate StringCounter processing elements (#1427)
---
.../transformation/jvm/TransformationJvmInit.java | 4 +-
.../stringoperator/counter/StringCounter.java | 79 --------
.../counter/StringCounterParameters.java | 36 ----
...Controller.java => StringCounterProcessor.java} | 201 ++++++++++++---------
.../counter/TestStringCounterProcessor.java | 182 +++++++++++++++++++
5 files changed, 304 insertions(+), 198 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index b060d0858..1c432be95 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -42,7 +42,7 @@ import
org.apache.streampipes.processors.transformation.jvm.processor.hasher.Fie
import
org.apache.streampipes.processors.transformation.jvm.processor.mapper.FieldMapperProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.measurementconverter.MeasurementUnitConverterProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
-import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
+import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state.StringToStateController;
import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
import
org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
@@ -85,7 +85,7 @@ public class TransformationJvmInit extends
ExtensionsModelSubmitter {
new BooleanToStateController(),
new NumberLabelerController(),
new StringToStateController(),
- new StringCounterController(),
+ new StringCounterProcessor(),
new BooleanOperatorProcessor(),
new FiledRenameProcessor())
.registerMessagingFormats(
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java
deleted file mode 100644
index d6b3ad8d4..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.HashMap;
-
-public class StringCounter implements EventProcessor<StringCounterParameters> {
-
- private static Logger log;
-
- private String selectedFieldName;
- private String fieldValueOfLastEvent;
-
- private HashMap<String, Integer> changeCounter;
-
- @Override
- public void onInvocation(StringCounterParameters
stringCounterParametersParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) {
- log =
stringCounterParametersParameters.getGraph().getLogger(StringCounter.class);
- this.selectedFieldName =
stringCounterParametersParameters.getSelectedFieldName();
- this.fieldValueOfLastEvent = "";
- this.changeCounter = new HashMap<>();
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
-
- String value =
inputEvent.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
- String key = this.fieldValueOfLastEvent + ">" + value;
- boolean updateCounter = false;
-
- if (!this.fieldValueOfLastEvent.equals(value) &&
!this.fieldValueOfLastEvent.isEmpty()) {
- updateCounter = true;
-
- if (changeCounter.containsKey(key)) {
- changeCounter.put(key, changeCounter.get(key) + 1);
- } else {
- changeCounter.put(key, 1);
- }
- }
-
- if (updateCounter) {
-
inputEvent.addField(StringCounterController.CHANGE_FROM_FIELD_RUNTIME_NAME,
this.fieldValueOfLastEvent);
-
inputEvent.addField(StringCounterController.CHANGE_TO_FIELD_RUNTIME_NAME,
value);
- inputEvent.addField(StringCounterController.COUNT_FIELD_RUNTIME_NAME,
changeCounter.get(key));
- out.collect(inputEvent);
- }
-
- this.fieldValueOfLastEvent = value;
- }
-
- @Override
- public void onDetach() {
- }
-
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java
deleted file mode 100644
index 8ffa9f169..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterParameters.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class StringCounterParameters extends EventProcessorBindingParams {
- private String selectedFieldName;
-
- public StringCounterParameters(DataProcessorInvocation graph, String
selectedFieldName) {
- super(graph);
- this.selectedFieldName = selectedFieldName;
- }
-
- public String getSelectedFieldName() {
- return selectedFieldName;
- }
-
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterProcessor.java
similarity index 57%
rename from
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java
rename to
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterProcessor.java
index cdc21a676..c8bb27c14 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterController.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/StringCounterProcessor.java
@@ -1,81 +1,120 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class StringCounterController extends
StandaloneEventProcessingDeclarer<StringCounterParameters> {
-
- private static final String FIELD_ID = "field";
- private static final String COUNT_FIELD_ID = "countField";
- private static final String CHANGE_FROM_FIELD_ID = "changeFromField";
- private static final String CHANGE_TO_FIELD_ID = "changeToField";
-
- public static final String COUNT_FIELD_RUNTIME_NAME = "counter";
- public static final String CHANGE_FROM_FIELD_RUNTIME_NAME = "change_from";
- public static final String CHANGE_TO_FIELD_RUNTIME_NAME = "change_to";
-
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(
-
"org.apache.streampipes.processors.transformation.jvm.stringoperator.counter")
- .category(DataProcessorType.STRING_OPERATOR,
DataProcessorType.COUNT_OPERATOR)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(
- EpRequirements.stringReq(),
- Labels.withId(FIELD_ID),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.stringEp(Labels.withId(CHANGE_FROM_FIELD_ID),
CHANGE_FROM_FIELD_RUNTIME_NAME,
- "http://schema.org/String"),
- EpProperties.stringEp(Labels.withId(CHANGE_TO_FIELD_ID),
CHANGE_TO_FIELD_RUNTIME_NAME,
- "http://schema.org/String"),
- EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID),
COUNT_FIELD_RUNTIME_NAME, "http://schema.org/Number")
- ))
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<StringCounterParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
-
- String selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
-
- StringCounterParameters params = new StringCounterParameters(graph,
selectedFieldName);
- return new ConfiguredEventProcessor<>(params, StringCounter::new);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.HashMap;
+
+public class StringCounterProcessor extends StreamPipesDataProcessor {
+
+ private static Logger log;
+
+ protected static final String FIELD_ID = "field";
+ private static final String COUNT_FIELD_ID = "countField";
+ private static final String CHANGE_FROM_FIELD_ID = "changeFromField";
+ private static final String CHANGE_TO_FIELD_ID = "changeToField";
+
+ public static final String COUNT_FIELD_RUNTIME_NAME = "counter";
+ public static final String CHANGE_FROM_FIELD_RUNTIME_NAME = "change_from";
+ public static final String CHANGE_TO_FIELD_RUNTIME_NAME = "change_to";
+
+ public String selectedFieldName;
+ private String fieldValueOfLastEvent;
+
+ private HashMap<String, Integer> changeCounter;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create(
+
"org.apache.streampipes.processors.transformation.jvm.stringoperator.counter")
+ .category(DataProcessorType.STRING_OPERATOR,
DataProcessorType.COUNT_OPERATOR)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.stringReq(),
+ Labels.withId(FIELD_ID),
+ PropertyScope.NONE)
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.stringEp(Labels.withId(CHANGE_FROM_FIELD_ID),
CHANGE_FROM_FIELD_RUNTIME_NAME,
+ "http://schema.org/String"),
+ EpProperties.stringEp(Labels.withId(CHANGE_TO_FIELD_ID),
CHANGE_TO_FIELD_RUNTIME_NAME,
+ "http://schema.org/String"),
+ EpProperties.numberEp(Labels.withId(COUNT_FIELD_ID),
COUNT_FIELD_RUNTIME_NAME, "http://schema.org/Number")
+ ))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ log = parameters.getGraph().getLogger(StringCounterProcessor.class);
+ ProcessingElementParameterExtractor extractor = parameters.extractor();
+ this.selectedFieldName = extractor.mappingPropertyValue(FIELD_ID);
+ this.fieldValueOfLastEvent = "";
+ this.changeCounter = new HashMap<>();
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ String value =
event.getFieldBySelector(selectedFieldName).getAsPrimitive().getAsString();
+ String key = this.fieldValueOfLastEvent + ">" + value;
+ boolean updateCounter = false;
+
+ if (!this.fieldValueOfLastEvent.equals(value) &&
!this.fieldValueOfLastEvent.isEmpty()) {
+ updateCounter = true;
+ changeCounter.put(key, changeCounter.getOrDefault(key, 0) + 1);
+ }
+
+ if (updateCounter) {
+ event.addField(StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME,
this.fieldValueOfLastEvent);
+ event.addField(StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME,
value);
+ event.addField(StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME,
changeCounter.get(key));
+ collector.collect(event);
+ }
+
+ this.fieldValueOfLastEvent = value;
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
new file mode 100644
index 000000000..3835c0ae8
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/counter/TestStringCounterProcessor.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.runtime.SchemaInfo;
+import org.apache.streampipes.model.runtime.SourceInfo;
+import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.sdk.helpers.Tuple3;
+import org.apache.streampipes.test.generator.EventStreamGenerator;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+
+@RunWith(Parameterized.class)
+public class TestStringCounterProcessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestStringCounterProcessor.class);
+
+ @org.junit.runners.Parameterized.Parameters
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"Test", List.of("t1"), new Tuple3<>("", "", 0)},
+ {"Test", Arrays.asList("t1", "t2"), new Tuple3<>("t1", "t2", 1)},
+ {"Test", Arrays.asList("t1", "t2", "t1", "t2"), new Tuple3<>("t1",
"t2", 2)},
+ {"Test", Arrays.asList("t1", "t2", "t1", "t3"), new Tuple3<>("t1",
"t3", 1)}
+ });
+ }
+
+ @org.junit.runners.Parameterized.Parameter
+ public String selectedFieldName;
+
+ @org.junit.runners.Parameterized.Parameter(1)
+ public List<String> eventStrings;
+
+ @org.junit.runners.Parameterized.Parameter(2)
+ public Tuple3<String, String, Integer> expectedValue;
+
+ @Test
+ public void testStringCounter() {
+ StringCounterProcessor stringCounter = new StringCounterProcessor();
+ DataProcessorDescription originalGraph = stringCounter.declareModel();
+
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
+
+ DataProcessorInvocation graph =
InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
+ graph.setInputStreams(Collections
+ .singletonList(EventStreamGenerator
+
.makeStreamWithProperties(Collections.singletonList(selectedFieldName))));
+
graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList(selectedFieldName)));
+
graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
+ .setActualTopicName("output-topic");
+
+ MappingPropertyUnary mappingPropertyUnary =
graph.getStaticProperties().stream()
+ .filter(p -> p instanceof MappingPropertyUnary)
+ .map((p -> (MappingPropertyUnary) p))
+ .filter(p ->
p.getInternalName().equals(StringCounterProcessor.FIELD_ID))
+ .findFirst().orElse(null);
+ assert mappingPropertyUnary != null;
+ mappingPropertyUnary.setSelectedProperty("s0::" + selectedFieldName);
+ ProcessorParams params = new ProcessorParams(graph);
+
+ SpOutputCollector spOut = new SpOutputCollector() {
+ @Override
+ public void registerConsumer(String routeId,
InternalEventProcessor<Map<String, Object>> consumer) {
+ }
+
+ @Override
+ public void unregisterConsumer(String routeId) {
+ }
+
+ @Override
+ public void connect() throws SpRuntimeException {
+ }
+
+ @Override
+ public void disconnect() throws SpRuntimeException {
+ }
+
+ @Override
+ public void collect(Event event) {
+ }
+ };
+
+ stringCounter.onInvocation(params, spOut, null);
+ Tuple3<String, String, Integer> tuple = sendEvents(stringCounter, spOut);
+ LOG.info("Expected match count is {}.", expectedValue.x);
+ LOG.info("Actual match count is {}.", tuple.x);
+ assertEquals(expectedValue.x, tuple.x);
+ LOG.info("Expected change from is {}.", expectedValue.k);
+ LOG.info("Actual change from is {}.", tuple.k);
+ assertEquals(expectedValue.k, tuple.k);
+ LOG.info("Expected change to is {}.", expectedValue.k);
+ LOG.info("Actual change to is {}.", tuple.k);
+ assertEquals(expectedValue.v, tuple.v);
+ }
+
+ private Tuple3<String, String, Integer> sendEvents(StringCounterProcessor
stringCounter, SpOutputCollector spOut) {
+ int counter = 0;
+ String changeFrom = "", changeTo = "";
+ List<Event> events = makeEvents();
+ for (Event event : events) {
+ LOG.info("Sending event with value "
+ + event.getFieldBySelector("s0::" +
selectedFieldName).getAsPrimitive().getAsString());
+ stringCounter.onEvent(event, spOut);
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ counter =
event.getFieldBySelector(StringCounterProcessor.COUNT_FIELD_RUNTIME_NAME)
+ .getAsPrimitive()
+ .getAsInt();
+ changeFrom =
event.getFieldBySelector(StringCounterProcessor.CHANGE_FROM_FIELD_RUNTIME_NAME)
+ .getAsPrimitive()
+ .getAsString();
+ changeTo =
event.getFieldBySelector(StringCounterProcessor.CHANGE_TO_FIELD_RUNTIME_NAME)
+ .getAsPrimitive()
+ .getAsString();
+ LOG.info(changeFrom + " change to " + changeTo + ", value = " +
counter);
+ } catch (IllegalArgumentException e) {
+
+ }
+ }
+ return new Tuple3<>(changeFrom, changeTo, counter);
+ }
+
+
+ private List<Event> makeEvents() {
+ List<Event> events = Lists.newArrayList();
+ for (String eventSetting : eventStrings) {
+ events.add(makeEvent(eventSetting));
+ }
+ return events;
+ }
+
+ private Event makeEvent(String value) {
+ Map<String, Object> map = Maps.newHashMap();
+ map.put(selectedFieldName, value);
+ return EventFactory.fromMap(map,
+ new SourceInfo("test" + "-topic", "s0"),
+ new SchemaInfo(null, Lists.newArrayList()));
+ }
+}
\ No newline at end of file