This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 83b50d6b0 Migrate StringToState processing elements  (#1444)
83b50d6b0 is described below

commit 83b50d6b0d4acf4a2720c1c8bbac086f59effcd3
Author: Liu Xiao <[email protected]>
AuthorDate: Sun Mar 26 20:27:06 2023 +0800

    Migrate StringToState processing elements  (#1444)
    
    * init string to state
    
    * Migrate StringToState processing elements
---
 .../transformation/jvm/TransformationJvmInit.java  |   4 +-
 .../stringoperator/state/StringToState.java        |  59 ------
 .../state/StringToStateParameters.java             |  42 -----
 ...Controller.java => StringToStateProcessor.java} | 172 +++++++++--------
 .../state/TestStringToStateProcessor.java          | 207 +++++++++++++++++++++
 5 files changed, 307 insertions(+), 177 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 1c432be95..885379999 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -43,7 +43,7 @@ import 
org.apache.streampipes.processors.transformation.jvm.processor.mapper.Fie
 import 
org.apache.streampipes.processors.transformation.jvm.processor.measurementconverter.MeasurementUnitConverterProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterProcessor;
-import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state.StringToStateController;
+import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state.StringToStateProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.timestampextractor.TimestampExtractorController;
@@ -84,7 +84,7 @@ public class TransformationJvmInit extends 
ExtensionsModelSubmitter {
             new SignalEdgeFilterController(),
             new BooleanToStateController(),
             new NumberLabelerController(),
-            new StringToStateController(),
+            new StringToStateProcessor(),
             new StringCounterProcessor(),
             new BooleanOperatorProcessor(),
             new FiledRenameProcessor())
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToState.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToState.java
deleted file mode 100644
index f3f59b1df..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToState.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class StringToState implements EventProcessor<StringToStateParameters> {
-
-  private static Logger log;
-
-  private List<String> stateFields;
-
-  @Override
-  public void onInvocation(StringToStateParameters booleanInverterParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) {
-    log = booleanInverterParameters.getGraph().getLogger(StringToState.class);
-    this.stateFields = booleanInverterParameters.getStateFields();
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-    List<String> states = new ArrayList<>();
-
-    for (String stateField : stateFields) {
-      
states.add(inputEvent.getFieldBySelector(stateField).getAsPrimitive().getAsString());
-    }
-
-    inputEvent.addField(StringToStateController.CURRENT_STATE, 
states.toArray());
-    out.collect(inputEvent);
-  }
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateParameters.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateParameters.java
deleted file mode 100644
index 479c28eed..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateParameters.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class StringToStateParameters extends EventProcessorBindingParams {
-  private List<String> stateFields;
-
-  public StringToStateParameters(DataProcessorInvocation graph, List<String> 
stateFields) {
-    super(graph);
-    this.stateFields = stateFields;
-  }
-
-  public List<String> getStateFields() {
-    return stateFields;
-  }
-
-  public void setStateFields(List<String> stateFields) {
-    this.stateFields = stateFields;
-  }
-
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateController.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
similarity index 61%
rename from 
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateController.java
rename to 
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
index 0bbd78724..39174b553 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateController.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
@@ -1,74 +1,98 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.vocabulary.SPSensor;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class StringToStateController extends 
StandaloneEventProcessingDeclarer<StringToStateParameters> {
-
-  public static final String STRING_STATE_FIELD = "string_state_field";
-
-  public static final String CURRENT_STATE = "current_state";
-
-
-  @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create(
-            
"org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state")
-        .category(DataProcessorType.STRING_OPERATOR)
-        .withLocales(Locales.EN)
-        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithNaryMapping(EpRequirements.stringReq(), 
Labels.withId(STRING_STATE_FIELD),
-                PropertyScope.NONE)
-            .build())
-        .outputStrategy(OutputStrategies.append(
-            EpProperties.listStringEp(Labels.withId(CURRENT_STATE), 
CURRENT_STATE, SPSensor.STATE)
-        ))
-        .build();
-  }
-
-  @Override
-  public ConfiguredEventProcessor<StringToStateParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                        
ProcessingElementParameterExtractor extractor) {
-
-    List<String> stateFields = 
extractor.mappingPropertyValues(STRING_STATE_FIELD);
-
-    StringToStateParameters params = new StringToStateParameters(graph, 
stateFields);
-
-    return new ConfiguredEventProcessor<>(params, StringToState::new);
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SPSensor;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class StringToStateProcessor extends StreamPipesDataProcessor {
+
+  private static Logger log;
+
+  public static final String STRING_STATE_FIELD = "string_state_field";
+
+  public static final String CURRENT_STATE = "current_state";
+
+  private List<String> stateFields;
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create(
+            
"org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state")
+        .category(DataProcessorType.STRING_OPERATOR)
+        .withLocales(Locales.EN)
+        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .requiredStream(StreamRequirementsBuilder.create()
+            .requiredPropertyWithNaryMapping(EpRequirements.stringReq(), 
Labels.withId(STRING_STATE_FIELD),
+                PropertyScope.NONE)
+            .build())
+        .outputStrategy(OutputStrategies.append(
+            EpProperties.listStringEp(Labels.withId(CURRENT_STATE), 
CURRENT_STATE, SPSensor.STATE)
+        ))
+        .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    log = parameters.getGraph().getLogger(StringToStateProcessor.class);
+    ProcessingElementParameterExtractor extractor = parameters.extractor();
+    this.stateFields = extractor.mappingPropertyValues(STRING_STATE_FIELD);
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
+    List<String> states = Lists.newArrayList();
+
+    for (String stateField : stateFields) {
+      
states.add(event.getFieldBySelector(stateField).getAsPrimitive().getAsString());
+    }
+
+    event.addField(CURRENT_STATE, states.toArray());
+    collector.collect(event);
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
new file mode 100644
index 000000000..7b4ebfadf
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.runtime.SchemaInfo;
+import org.apache.streampipes.model.runtime.SourceInfo;
+import org.apache.streampipes.model.staticproperty.MappingPropertyNary;
+import org.apache.streampipes.test.generator.EventStreamGenerator;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertArrayEquals;
+
+@RunWith(Parameterized.class)
+public class TestStringToStateProcessor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestStringToStateProcessor.class);
+
+  @org.junit.runners.Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {
+            List.of(),
+            List.of("c1", "c2", "c3"),
+            List.of(Arrays.asList("t1", "t2", "t3")),
+            List.of()
+        },
+        {
+            List.of("c1"),
+            List.of("c1", "c2", "c3"),
+            List.of(Arrays.asList("t1", "t2", "t3")),
+            List.of("t1")
+        },
+        {
+            List.of("c1", "c2"),
+            List.of("c1", "c2", "c3"),
+            List.of(Arrays.asList("t1", "t2", "t3")),
+            Arrays.asList("t1", "t2")
+        },
+        {
+            List.of("c1", "c2"),
+            List.of("c1", "c2", "c3"),
+            Arrays.asList(
+                Arrays.asList("t1-1", "t2-1", "t3-1"),
+                Arrays.asList("t1-2", "t2-2", "t3-2")
+            ),
+            Arrays.asList("t1-2", "t2-2")
+        },
+        {
+            List.of("c1", "c2", "c3"),
+            List.of("c1", "c2", "c3"),
+            Arrays.asList(
+                Arrays.asList("t1-1", "t2-1", "t3-1"),
+                Arrays.asList("t1-2", "t2-2", "t3-2"),
+                Arrays.asList("t1-3", "t2-3", "t3-3")
+            ),
+            Arrays.asList("t1-3", "t2-3", "t3-3")
+        }
+    });
+  }
+
+  @org.junit.runners.Parameterized.Parameter
+  public List<String> selectedFieldNames;
+
+  @org.junit.runners.Parameterized.Parameter(1)
+  public List<String> fieldNames;
+
+  @org.junit.runners.Parameterized.Parameter(2)
+  public List<List<String>> eventStrings;
+
+  @org.junit.runners.Parameterized.Parameter(3)
+  public List<String> expectedValue;
+
+  private static final String DEFAULT_STREAM_NAME = "stream1";
+
+  @Test
+  public void testStringToState() {
+    StringToStateProcessor stringToStateProcessor = new 
StringToStateProcessor();
+    DataProcessorDescription originalGraph = 
stringToStateProcessor.declareModel();
+    
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
+
+    DataProcessorInvocation graph = 
InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
+    graph.setInputStreams(Collections
+        .singletonList(EventStreamGenerator
+            
.makeStreamWithProperties(Collections.singletonList("stream-in"))));
+    
graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("stream-out")));
+    
graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
+        .setActualTopicName("output-topic");
+
+    MappingPropertyNary mappingPropertyNary = 
graph.getStaticProperties().stream()
+        .filter(p -> p instanceof MappingPropertyNary)
+        .map(p -> (MappingPropertyNary) p)
+        .filter(p -> 
p.getInternalName().equals(StringToStateProcessor.STRING_STATE_FIELD))
+        .findFirst().orElse(null);
+
+    assert mappingPropertyNary != null;
+    mappingPropertyNary.setSelectedProperties(
+        selectedFieldNames.stream().map(field -> DEFAULT_STREAM_NAME + "::" + 
field).toList());
+
+    ProcessorParams params = new ProcessorParams(graph);
+
+    SpOutputCollector spOutputCollector = new SpOutputCollector() {
+      @Override
+      public void registerConsumer(String routeId, 
InternalEventProcessor<Map<String, Object>> consumer) {
+      }
+
+      @Override
+      public void unregisterConsumer(String routeId) {
+      }
+
+      @Override
+      public void connect() throws SpRuntimeException {
+      }
+
+      @Override
+      public void disconnect() throws SpRuntimeException {
+      }
+
+      @Override
+      public void collect(Event event) {
+      }
+    };
+
+    stringToStateProcessor.onInvocation(params, spOutputCollector, null);
+    Object[] states = sendEvents(stringToStateProcessor, spOutputCollector);
+    LOG.info("Expected states is {}.", expectedValue);
+    LOG.info("Actual states is {}.", Arrays.toString(states));
+    assertArrayEquals(expectedValue.toArray(), states);
+  }
+
+  private Object[] sendEvents(StringToStateProcessor stateProcessor, 
SpOutputCollector spOut) {
+    List<Event> events = makeEvents();
+    Object[] states = null;
+    for (Event event : events) {
+      stateProcessor.onEvent(event, spOut);
+      try {
+        TimeUnit.MILLISECONDS.sleep(100);
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      try {
+        states = (Object[]) 
event.getFieldBySelector(StringToStateProcessor.CURRENT_STATE)
+            .getAsPrimitive().getRawValue();
+        LOG.info("Current states: " + Arrays.toString(states));
+      } catch (IllegalArgumentException e) {
+
+      }
+    }
+    return states;
+  }
+
+  private List<Event> makeEvents() {
+    List<Event> events = Lists.newArrayList();
+    for (List<String> eventSetting : eventStrings) {
+      events.add(makeEvent(eventSetting));
+    }
+    return events;
+  }
+
+  private Event makeEvent(List<String> value) {
+    Map<String, Object> map = Maps.newHashMap();
+    for (int i = 0; i < selectedFieldNames.size(); i++) {
+      map.put(selectedFieldNames.get(i), value.get(i));
+    }
+    return EventFactory.fromMap(map,
+        new SourceInfo("test-topic", DEFAULT_STREAM_NAME),
+        new SchemaInfo(null, Lists.newArrayList()));
+  }
+}

Reply via email to