This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 83b50d6b0 Migrate StringToState processing elements (#1444)
83b50d6b0 is described below
commit 83b50d6b0d4acf4a2720c1c8bbac086f59effcd3
Author: Liu Xiao <[email protected]>
AuthorDate: Sun Mar 26 20:27:06 2023 +0800
Migrate StringToState processing elements (#1444)
* init string to state
* Migrate StringToState processing elements
---
.../transformation/jvm/TransformationJvmInit.java | 4 +-
.../stringoperator/state/StringToState.java | 59 ------
.../state/StringToStateParameters.java | 42 -----
...Controller.java => StringToStateProcessor.java} | 172 +++++++++--------
.../state/TestStringToStateProcessor.java | 207 +++++++++++++++++++++
5 files changed, 307 insertions(+), 177 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 1c432be95..885379999 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -43,7 +43,7 @@ import
org.apache.streampipes.processors.transformation.jvm.processor.mapper.Fie
import
org.apache.streampipes.processors.transformation.jvm.processor.measurementconverter.MeasurementUnitConverterProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterProcessor;
-import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state.StringToStateController;
+import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state.StringToStateProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
import
org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
import
org.apache.streampipes.processors.transformation.jvm.processor.timestampextractor.TimestampExtractorController;
@@ -84,7 +84,7 @@ public class TransformationJvmInit extends
ExtensionsModelSubmitter {
new SignalEdgeFilterController(),
new BooleanToStateController(),
new NumberLabelerController(),
- new StringToStateController(),
+ new StringToStateProcessor(),
new StringCounterProcessor(),
new BooleanOperatorProcessor(),
new FiledRenameProcessor())
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToState.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToState.java
deleted file mode 100644
index f3f59b1df..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToState.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class StringToState implements EventProcessor<StringToStateParameters> {
-
- private static Logger log;
-
- private List<String> stateFields;
-
- @Override
- public void onInvocation(StringToStateParameters booleanInverterParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) {
- log = booleanInverterParameters.getGraph().getLogger(StringToState.class);
- this.stateFields = booleanInverterParameters.getStateFields();
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
- List<String> states = new ArrayList<>();
-
- for (String stateField : stateFields) {
-
states.add(inputEvent.getFieldBySelector(stateField).getAsPrimitive().getAsString());
- }
-
- inputEvent.addField(StringToStateController.CURRENT_STATE,
states.toArray());
- out.collect(inputEvent);
- }
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateParameters.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateParameters.java
deleted file mode 100644
index 479c28eed..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateParameters.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class StringToStateParameters extends EventProcessorBindingParams {
- private List<String> stateFields;
-
- public StringToStateParameters(DataProcessorInvocation graph, List<String>
stateFields) {
- super(graph);
- this.stateFields = stateFields;
- }
-
- public List<String> getStateFields() {
- return stateFields;
- }
-
- public void setStateFields(List<String> stateFields) {
- this.stateFields = stateFields;
- }
-
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateController.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
similarity index 61%
rename from
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateController.java
rename to
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
index 0bbd78724..39174b553 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateController.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/StringToStateProcessor.java
@@ -1,74 +1,98 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpProperties;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.vocabulary.SPSensor;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-import java.util.List;
-
-public class StringToStateController extends
StandaloneEventProcessingDeclarer<StringToStateParameters> {
-
- public static final String STRING_STATE_FIELD = "string_state_field";
-
- public static final String CURRENT_STATE = "current_state";
-
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(
-
"org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state")
- .category(DataProcessorType.STRING_OPERATOR)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithNaryMapping(EpRequirements.stringReq(),
Labels.withId(STRING_STATE_FIELD),
- PropertyScope.NONE)
- .build())
- .outputStrategy(OutputStrategies.append(
- EpProperties.listStringEp(Labels.withId(CURRENT_STATE),
CURRENT_STATE, SPSensor.STATE)
- ))
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<StringToStateParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
-
- List<String> stateFields =
extractor.mappingPropertyValues(STRING_STATE_FIELD);
-
- StringToStateParameters params = new StringToStateParameters(graph,
stateFields);
-
- return new ConfiguredEventProcessor<>(params, StringToState::new);
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.vocabulary.SPSensor;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class StringToStateProcessor extends StreamPipesDataProcessor {
+
+ private static Logger log;
+
+ public static final String STRING_STATE_FIELD = "string_state_field";
+
+ public static final String CURRENT_STATE = "current_state";
+
+ private List<String> stateFields;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create(
+
"org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state")
+ .category(DataProcessorType.STRING_OPERATOR)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithNaryMapping(EpRequirements.stringReq(),
Labels.withId(STRING_STATE_FIELD),
+ PropertyScope.NONE)
+ .build())
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.listStringEp(Labels.withId(CURRENT_STATE),
CURRENT_STATE, SPSensor.STATE)
+ ))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ log = parameters.getGraph().getLogger(StringToStateProcessor.class);
+ ProcessingElementParameterExtractor extractor = parameters.extractor();
+ this.stateFields = extractor.mappingPropertyValues(STRING_STATE_FIELD);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
+ List<String> states = Lists.newArrayList();
+
+ for (String stateField : stateFields) {
+
states.add(event.getFieldBySelector(stateField).getAsPrimitive().getAsString());
+ }
+
+ event.addField(CURRENT_STATE, states.toArray());
+ collector.collect(event);
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
new file mode 100644
index 000000000..7b4ebfadf
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/test/java/org/apache/streampipes/processors/transformation/jvm/processor/stringoperator/state/TestStringToStateProcessor.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.state;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.messaging.InternalEventProcessor;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.EventFactory;
+import org.apache.streampipes.model.runtime.SchemaInfo;
+import org.apache.streampipes.model.runtime.SourceInfo;
+import org.apache.streampipes.model.staticproperty.MappingPropertyNary;
+import org.apache.streampipes.test.generator.EventStreamGenerator;
+import org.apache.streampipes.test.generator.InvocationGraphGenerator;
+import org.apache.streampipes.test.generator.grounding.EventGroundingGenerator;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertArrayEquals;
+
+@RunWith(Parameterized.class)
+public class TestStringToStateProcessor {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestStringToStateProcessor.class);
+
+ @org.junit.runners.Parameterized.Parameters
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {
+ List.of(),
+ List.of("c1", "c2", "c3"),
+ List.of(Arrays.asList("t1", "t2", "t3")),
+ List.of()
+ },
+ {
+ List.of("c1"),
+ List.of("c1", "c2", "c3"),
+ List.of(Arrays.asList("t1", "t2", "t3")),
+ List.of("t1")
+ },
+ {
+ List.of("c1", "c2"),
+ List.of("c1", "c2", "c3"),
+ List.of(Arrays.asList("t1", "t2", "t3")),
+ Arrays.asList("t1", "t2")
+ },
+ {
+ List.of("c1", "c2"),
+ List.of("c1", "c2", "c3"),
+ Arrays.asList(
+ Arrays.asList("t1-1", "t2-1", "t3-1"),
+ Arrays.asList("t1-2", "t2-2", "t3-2")
+ ),
+ Arrays.asList("t1-2", "t2-2")
+ },
+ {
+ List.of("c1", "c2", "c3"),
+ List.of("c1", "c2", "c3"),
+ Arrays.asList(
+ Arrays.asList("t1-1", "t2-1", "t3-1"),
+ Arrays.asList("t1-2", "t2-2", "t3-2"),
+ Arrays.asList("t1-3", "t2-3", "t3-3")
+ ),
+ Arrays.asList("t1-3", "t2-3", "t3-3")
+ }
+ });
+ }
+
+ @org.junit.runners.Parameterized.Parameter
+ public List<String> selectedFieldNames;
+
+ @org.junit.runners.Parameterized.Parameter(1)
+ public List<String> fieldNames;
+
+ @org.junit.runners.Parameterized.Parameter(2)
+ public List<List<String>> eventStrings;
+
+ @org.junit.runners.Parameterized.Parameter(3)
+ public List<String> expectedValue;
+
+ private static final String DEFAULT_STREAM_NAME = "stream1";
+
+ @Test
+ public void testStringToState() {
+ StringToStateProcessor stringToStateProcessor = new
StringToStateProcessor();
+ DataProcessorDescription originalGraph =
stringToStateProcessor.declareModel();
+
originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
+
+ DataProcessorInvocation graph =
InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
+ graph.setInputStreams(Collections
+ .singletonList(EventStreamGenerator
+
.makeStreamWithProperties(Collections.singletonList("stream-in"))));
+
graph.setOutputStream(EventStreamGenerator.makeStreamWithProperties(Collections.singletonList("stream-out")));
+
graph.getOutputStream().getEventGrounding().getTransportProtocol().getTopicDefinition()
+ .setActualTopicName("output-topic");
+
+ MappingPropertyNary mappingPropertyNary =
graph.getStaticProperties().stream()
+ .filter(p -> p instanceof MappingPropertyNary)
+ .map(p -> (MappingPropertyNary) p)
+ .filter(p ->
p.getInternalName().equals(StringToStateProcessor.STRING_STATE_FIELD))
+ .findFirst().orElse(null);
+
+ assert mappingPropertyNary != null;
+ mappingPropertyNary.setSelectedProperties(
+ selectedFieldNames.stream().map(field -> DEFAULT_STREAM_NAME + "::" +
field).toList());
+
+ ProcessorParams params = new ProcessorParams(graph);
+
+ SpOutputCollector spOutputCollector = new SpOutputCollector() {
+ @Override
+ public void registerConsumer(String routeId,
InternalEventProcessor<Map<String, Object>> consumer) {
+ }
+
+ @Override
+ public void unregisterConsumer(String routeId) {
+ }
+
+ @Override
+ public void connect() throws SpRuntimeException {
+ }
+
+ @Override
+ public void disconnect() throws SpRuntimeException {
+ }
+
+ @Override
+ public void collect(Event event) {
+ }
+ };
+
+ stringToStateProcessor.onInvocation(params, spOutputCollector, null);
+ Object[] states = sendEvents(stringToStateProcessor, spOutputCollector);
+ LOG.info("Expected states is {}.", expectedValue);
+ LOG.info("Actual states is {}.", Arrays.toString(states));
+ assertArrayEquals(expectedValue.toArray(), states);
+ }
+
+ private Object[] sendEvents(StringToStateProcessor stateProcessor,
SpOutputCollector spOut) {
+ List<Event> events = makeEvents();
+ Object[] states = null;
+ for (Event event : events) {
+ stateProcessor.onEvent(event, spOut);
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ states = (Object[])
event.getFieldBySelector(StringToStateProcessor.CURRENT_STATE)
+ .getAsPrimitive().getRawValue();
+ LOG.info("Current states: " + Arrays.toString(states));
+ } catch (IllegalArgumentException e) {
+
+ }
+ }
+ return states;
+ }
+
+ private List<Event> makeEvents() {
+ List<Event> events = Lists.newArrayList();
+ for (List<String> eventSetting : eventStrings) {
+ events.add(makeEvent(eventSetting));
+ }
+ return events;
+ }
+
+ private Event makeEvent(List<String> value) {
+ Map<String, Object> map = Maps.newHashMap();
+ for (int i = 0; i < selectedFieldNames.size(); i++) {
+ map.put(selectedFieldNames.get(i), value.get(i));
+ }
+ return EventFactory.fromMap(map,
+ new SourceInfo("test-topic", DEFAULT_STREAM_NAME),
+ new SchemaInfo(null, Lists.newArrayList()));
+ }
+}