This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch 1608-countarray
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1608-countarray by this push:
new 959d87090 Migrate count array, split array and signal edge filters
(#1608)(#1609)(#1610)
959d87090 is described below
commit 959d870900cd313d071bce029495f938f047b8cf
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri May 26 14:40:43 2023 +0200
Migrate count array, split array and signal edge filters
(#1608)(#1609)(#1610)
---
.../transformation/jvm/TransformationJvmInit.java | 8 +-
.../jvm/processor/array/count/CountArray.java | 59 --------
.../array/count/CountArrayParameters.java | 37 -----
...rayController.java => CountArrayProcessor.java} | 39 +++--
.../jvm/processor/array/split/SplitArray.java | 86 -----------
.../array/split/SplitArrayController.java | 73 ++++++++--
.../array/split/SplitArrayParameters.java | 43 ------
.../booloperator/edge/SignalEdgeFilter.java | 120 ---------------
.../edge/SignalEdgeFilterController.java | 85 -----------
.../edge/SignalEdgeFilterParameters.java | 70 ---------
.../edge/SignalEdgeFilterProcessor.java | 161 +++++++++++++++++++++
11 files changed, 253 insertions(+), 528 deletions(-)
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 14c357eff..0d107ea0d 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -27,10 +27,10 @@ import
org.apache.streampipes.extensions.management.model.SpServiceDefinitionBui
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import
org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayController;
+import
org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.array.split.SplitArrayController;
import
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.counter.BooleanCounterProcessor;
-import
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge.SignalEdgeFilterController;
+import
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge.SignalEdgeFilterProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.inverter.BooleanInverterProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.logical.BooleanOperatorProcessor;
import
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state.BooleanToStateController;
@@ -65,7 +65,7 @@ public class TransformationJvmInit extends
ExtensionsModelSubmitter {
"",
8090)
.registerPipelineElements(
- new CountArrayController(),
+ new CountArrayProcessor(),
new SplitArrayController(),
new CalculateDurationController(),
new ChangedValueDetectionController(),
@@ -81,7 +81,7 @@ public class TransformationJvmInit extends
ExtensionsModelSubmitter {
new TaskDurationController(),
new TransformToBooleanController(),
new StringTimerProcessor(),
- new SignalEdgeFilterController(),
+ new SignalEdgeFilterProcessor(),
new BooleanToStateController(),
new NumberLabelerController(),
new StringToStateProcessor(),
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArray.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArray.java
deleted file mode 100644
index 7252073e5..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArray.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.array.count;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.AbstractField;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.List;
-
-public class CountArray implements EventProcessor<CountArrayParameters> {
-
- private static Logger log;
-
- private CountArrayParameters splitArrayParameters;
-
- @Override
- public void onInvocation(CountArrayParameters params, SpOutputCollector
spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) {
- log = params.getGraph().getLogger(CountArray.class);
-
- this.splitArrayParameters = params;
- }
-
- @Override
- public void onEvent(Event event, SpOutputCollector out) {
- String arrayField = splitArrayParameters.getArrayField();
-
- List<AbstractField> allEvents =
event.getFieldBySelector(arrayField).getAsList().getRawValue();
-
- event.addField(CountArrayController.COUNT_NAME, allEvents.size());
-
- out.collect(event);
- }
-
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayParameters.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayParameters.java
deleted file mode 100644
index c80918f39..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayParameters.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.array.count;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class CountArrayParameters extends EventProcessorBindingParams {
- private String arrayField;
-
-
- public CountArrayParameters(DataProcessorInvocation graph, String
arrayField) {
- super(graph);
- this.arrayField = arrayField;
- }
-
- public String getArrayField() {
- return arrayField;
- }
-
-}
\ No newline at end of file
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayController.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
similarity index 65%
rename from
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayController.java
rename to
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
index 1e97d7870..62b2818d4 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayController.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
@@ -18,13 +18,14 @@
package
org.apache.streampipes.processors.transformation.jvm.processor.array.count;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.AbstractField;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
@@ -32,14 +33,20 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class CountArrayController extends
StandaloneEventProcessingDeclarer<CountArrayParameters> {
+import java.util.List;
+
+public class CountArrayProcessor extends StreamPipesDataProcessor {
public static final String COUNT_NAME = "countValue";
public static final String ARRAY_FIELD = "array-field";
+ private String arrayField;
+
@Override
public DataProcessorDescription declareModel() {
return
ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.count-array")
@@ -57,12 +64,24 @@ public class CountArrayController extends
StandaloneEventProcessingDeclarer<Coun
}
@Override
- public ConfiguredEventProcessor<CountArrayParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
- String arrayField = extractor.mappingPropertyValue(ARRAY_FIELD);
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ this.arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD);
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) throws
SpRuntimeException {
- CountArrayParameters params = new CountArrayParameters(graph, arrayField);
- return new ConfiguredEventProcessor<>(params, CountArray::new);
+ List<AbstractField> allEvents =
event.getFieldBySelector(arrayField).getAsList().getRawValue();
+
+ event.addField(CountArrayProcessor.COUNT_NAME, allEvents.size());
+
+ collector.collect(event);
}
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArray.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArray.java
deleted file mode 100644
index fc61de3de..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArray.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.array.split;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.AbstractField;
-import org.apache.streampipes.model.runtime.field.ListField;
-import org.apache.streampipes.model.runtime.field.NestedField;
-import org.apache.streampipes.model.runtime.field.PrimitiveField;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.List;
-import java.util.Map;
-
-public class SplitArray implements EventProcessor<SplitArrayParameters> {
-
- private static Logger log;
-
- private SplitArrayParameters splitArrayParameters;
-
-
- @Override
- public void onInvocation(SplitArrayParameters splitArrayParameters,
SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) {
- log = splitArrayParameters.getGraph().getLogger(SplitArray.class);
- this.splitArrayParameters = splitArrayParameters;
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
- String arrayField = splitArrayParameters.getArrayField();
- List<String> keepProperties = splitArrayParameters.getKeepProperties();
-
- List<AbstractField> allEvents =
inputEvent.getFieldBySelector(arrayField).getAsList()
- .parseAsCustomType(o -> {
- if (o instanceof NestedField) {
- return (NestedField) o;
- } else if (o instanceof ListField) {
- return (ListField) o;
- } else {
- return (PrimitiveField) o;
- }
- });
-
- for (AbstractField field : allEvents) {
- Event outEvent = new Event();
- if (field instanceof NestedField) {
- for (Map.Entry<String, AbstractField> key : ((NestedField)
field).getRawValue().entrySet()) {
- outEvent.addField(key.getValue());
- }
- } else {
- outEvent.addField(SplitArrayController.VALUE, field);
- }
-
- for (String propertyName : keepProperties) {
- outEvent.addField(inputEvent.getFieldBySelector(propertyName));
- }
-
- out.collect(outEvent);
- }
-
- }
-
- @Override
- public void onDetach() {
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayController.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayController.java
index e94fdf480..5fb138465 100644
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayController.java
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayController.java
@@ -23,6 +23,10 @@ import
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOu
import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.AbstractField;
+import org.apache.streampipes.model.runtime.field.ListField;
+import org.apache.streampipes.model.runtime.field.NestedField;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyList;
import org.apache.streampipes.model.schema.EventSchema;
@@ -35,19 +39,25 @@ import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
-public class SplitArrayController extends
StandaloneEventProcessingDeclarer<SplitArrayParameters>
+public class SplitArrayController extends StreamPipesDataProcessor
implements
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation,
ProcessingElementParameterExtractor> {
public static final String KEEP_PROPERTIES_ID = "keep";
public static final String ARRAY_FIELD_ID = "array-field";
public static final String VALUE = "array_value";
+ private String arrayField;
+ private List<String> keepProperties;
+
@Override
public DataProcessorDescription declareModel() {
return
ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.split-array")
@@ -66,17 +76,6 @@ public class SplitArrayController extends
StandaloneEventProcessingDeclarer<Spli
.build();
}
- @Override
- public ConfiguredEventProcessor<SplitArrayParameters>
onInvocation(DataProcessorInvocation graph,
-
ProcessingElementParameterExtractor extractor) {
-
- String arrayField = extractor.mappingPropertyValue(ARRAY_FIELD_ID);
- List<String> keepProperties =
extractor.mappingPropertyValues(KEEP_PROPERTIES_ID);
-
- SplitArrayParameters params = new SplitArrayParameters(graph, arrayField,
keepProperties);
- return new ConfiguredEventProcessor<>(params, SplitArray::new);
- }
-
@Override
public EventSchema resolveOutputStrategy(DataProcessorInvocation
processingElement,
ProcessingElementParameterExtractor
extractor)
@@ -98,4 +97,50 @@ public class SplitArrayController extends
StandaloneEventProcessingDeclarer<Spli
return new EventSchema(outProperties);
}
+
+ @Override
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD_ID);
+ keepProperties =
parameters.extractor().mappingPropertyValues(KEEP_PROPERTIES_ID);
+ }
+
+ @Override
+ public void onEvent(Event event,
+ SpOutputCollector collector) throws SpRuntimeException {
+
+ List<AbstractField> allEvents =
event.getFieldBySelector(arrayField).getAsList()
+ .parseAsCustomType(o -> {
+ if (o instanceof NestedField) {
+ return o;
+ } else if (o instanceof ListField) {
+ return o;
+ } else {
+ return o;
+ }
+ });
+
+ for (AbstractField field : allEvents) {
+ Event outEvent = new Event();
+ if (field instanceof NestedField) {
+ for (Map.Entry<String, AbstractField> key : ((NestedField)
field).getRawValue().entrySet()) {
+ outEvent.addField(key.getValue());
+ }
+ } else {
+ outEvent.addField(SplitArrayController.VALUE, field);
+ }
+
+ for (String propertyName : keepProperties) {
+ outEvent.addField(event.getFieldBySelector(propertyName));
+ }
+
+ collector.collect(outEvent);
+ }
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayParameters.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayParameters.java
deleted file mode 100644
index 1ddd075e7..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayParameters.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.array.split;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class SplitArrayParameters extends EventProcessorBindingParams {
- private String arrayField;
- private List<String> keepProperties;
-
- public SplitArrayParameters(DataProcessorInvocation graph, String
arrayField, List<String> keepProperties) {
- super(graph);
- this.arrayField = arrayField;
- this.keepProperties = keepProperties;
- }
-
- public String getArrayField() {
- return arrayField;
- }
-
- public List<String> getKeepProperties() {
- return keepProperties;
- }
-}
\ No newline at end of file
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java
deleted file mode 100644
index f37131d50..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class SignalEdgeFilter implements
EventProcessor<SignalEdgeFilterParameters> {
-
- private static Logger log;
-
- private String booleanSignalField;
- private String flank;
- private Integer delay;
- private String eventSelection;
-
- private boolean lastValue;
- private int delayCount;
- private List<Event> resultEvents;
- private boolean edgeDetected;
-
- @Override
- public void onInvocation(SignalEdgeFilterParameters
booleanInverterParameters,
- SpOutputCollector spOutputCollector,
- EventProcessorRuntimeContext runtimeContext) {
- log =
booleanInverterParameters.getGraph().getLogger(SignalEdgeFilter.class);
- this.booleanSignalField =
booleanInverterParameters.getBooleanSignalField();
- this.flank = booleanInverterParameters.getFlank();
- this.delay = booleanInverterParameters.getDelay();
- this.eventSelection = booleanInverterParameters.getEventSelection();
-
- this.lastValue = false;
- this.delayCount = 0;
- this.resultEvents = new ArrayList<>();
- this.edgeDetected = false;
- }
-
- @Override
- public void onEvent(Event inputEvent, SpOutputCollector out) {
-
- boolean value =
inputEvent.getFieldBySelector(this.booleanSignalField).getAsPrimitive().getAsBoolean();
-
- // Detect edges in signal
- if (detectEdge(value, lastValue)) {
- this.edgeDetected = true;
- this.resultEvents = new ArrayList<>();
- this.delayCount = 0;
- }
-
- if (edgeDetected) {
- // Buffer event(s) according to user configuration
- addResultEvent(inputEvent);
-
- // Detect if the delay has been waited for
- if (this.delay == delayCount) {
- for (Event event : this.resultEvents) {
- out.collect(event);
- }
-
- this.edgeDetected = false;
-
- } else {
- this.delayCount++;
- }
- }
-
- this.lastValue = value;
- }
-
- @Override
- public void onDetach() {
- }
-
- private boolean detectEdge(boolean value, boolean lastValue) {
- if (this.flank.equals(SignalEdgeFilterController.FLANK_UP)) {
- return !lastValue && value;
- } else if (this.flank.equals(SignalEdgeFilterController.FLANK_DOWN)) {
- return lastValue && !value;
- } else if (this.flank.equals(SignalEdgeFilterController.BOTH)) {
- return value != lastValue;
- }
-
- return false;
- }
-
- private void addResultEvent(Event event) {
- if (this.eventSelection.equals(SignalEdgeFilterController.OPTION_FIRST)) {
- if (this.resultEvents.size() == 0) {
- this.resultEvents.add(event);
- }
- } else if
(this.eventSelection.equals(SignalEdgeFilterController.OPTION_LAST)) {
- this.resultEvents = new ArrayList<>();
- this.resultEvents.add(event);
- } else if
(this.eventSelection.equals(SignalEdgeFilterController.OPTION_ALL)) {
- this.resultEvents.add(event);
- }
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java
deleted file mode 100644
index 405d24a6f..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class SignalEdgeFilterController extends
StandaloneEventProcessingDeclarer<SignalEdgeFilterParameters> {
-
- public static final String BOOLEAN_SIGNAL_FIELD = "boolean_signal_field";
- public static final String FLANK_ID = "flank";
- public static final String DELAY_ID = "delay";
- private static final String EVENT_SELECTION_ID = "event-selection-id";
-
- public static final String FLANK_UP = "FALSE -> TRUE";
- public static final String FLANK_DOWN = "TRUE -> FALSE";
- public static final String BOTH = "BOTH";
- public static final String OPTION_FIRST = "First";
- public static final String OPTION_LAST = "Last";
- public static final String OPTION_ALL = "All";
-
- @Override
- public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create(
-
"org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge")
- .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.FILTER)
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .requiredStream(StreamRequirementsBuilder.create()
- .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(),
Labels.withId(BOOLEAN_SIGNAL_FIELD),
- PropertyScope.NONE)
- .build())
- .requiredSingleValueSelection(Labels.withId(FLANK_ID),
Options.from(BOTH, FLANK_UP, FLANK_DOWN))
- .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
- .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
- Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
- .outputStrategy(OutputStrategies.keep())
- .build();
- }
-
- @Override
- public ConfiguredEventProcessor<SignalEdgeFilterParameters> onInvocation(
- DataProcessorInvocation graph,
- ProcessingElementParameterExtractor extractor) {
-
- String booleanSignalField =
extractor.mappingPropertyValue(BOOLEAN_SIGNAL_FIELD);
- String flank = extractor.selectedSingleValue(FLANK_ID, String.class);
- Integer delay = extractor.singleValueParameter(DELAY_ID, Integer.class);
- String eventSelection = extractor.selectedSingleValue(EVENT_SELECTION_ID,
String.class);
-
- SignalEdgeFilterParameters params =
- new SignalEdgeFilterParameters(graph, booleanSignalField, flank,
delay, eventSelection);
-
- return new ConfiguredEventProcessor<>(params, SignalEdgeFilter::new);
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java
deleted file mode 100644
index c766ab68a..000000000
---
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class SignalEdgeFilterParameters extends EventProcessorBindingParams {
- private String booleanSignalField;
- private String flank;
- private Integer delay;
- private String eventSelection;
-
- public SignalEdgeFilterParameters(DataProcessorInvocation graph, String
booleanSignalField, String flank,
- Integer delay, String eventSelection) {
- super(graph);
- this.booleanSignalField = booleanSignalField;
- this.flank = flank;
- this.delay = delay;
- this.eventSelection = eventSelection;
- }
-
- public String getBooleanSignalField() {
- return booleanSignalField;
- }
-
- public void setBooleanSignalField(String booleanSignalField) {
- this.booleanSignalField = booleanSignalField;
- }
-
- public String getFlank() {
- return flank;
- }
-
- public void setFlank(String flank) {
- this.flank = flank;
- }
-
- public Integer getDelay() {
- return delay;
- }
-
- public void setDelay(Integer delay) {
- this.delay = delay;
- }
-
- public String getEventSelection() {
- return eventSelection;
- }
-
- public void setEventSelection(String eventSelection) {
- this.eventSelection = eventSelection;
- }
-}
diff --git
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
new file mode 100644
index 000000000..3df28ad08
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SignalEdgeFilterProcessor extends StreamPipesDataProcessor {
+
+ public static final String BOOLEAN_SIGNAL_FIELD = "boolean_signal_field";
+ public static final String FLANK_ID = "flank";
+ public static final String DELAY_ID = "delay";
+ private static final String EVENT_SELECTION_ID = "event-selection-id";
+
+ public static final String FLANK_UP = "FALSE -> TRUE";
+ public static final String FLANK_DOWN = "TRUE -> FALSE";
+ public static final String BOTH = "BOTH";
+ public static final String OPTION_FIRST = "First";
+ public static final String OPTION_LAST = "Last";
+ public static final String OPTION_ALL = "All";
+
+ private String booleanSignalField;
+ private String flank;
+ private Integer delay;
+ private String eventSelection;
+
+ private boolean lastValue;
+ private int delayCount;
+ private List<Event> resultEvents;
+ private boolean edgeDetected;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder.create(
+
"org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge")
+ .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.FILTER)
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .requiredStream(StreamRequirementsBuilder.create()
+ .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(),
Labels.withId(BOOLEAN_SIGNAL_FIELD),
+ PropertyScope.NONE)
+ .build())
+ .requiredSingleValueSelection(Labels.withId(FLANK_ID),
Options.from(BOTH, FLANK_UP, FLANK_DOWN))
+ .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
+ .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
+ Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
+ .outputStrategy(OutputStrategies.keep())
+ .build();
+ }
+
+ @Override
+ public void onInvocation(ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws
SpRuntimeException {
+ booleanSignalField =
parameters.extractor().mappingPropertyValue(BOOLEAN_SIGNAL_FIELD);
+ flank = parameters.extractor().selectedSingleValue(FLANK_ID, String.class);
+ delay = parameters.extractor().singleValueParameter(DELAY_ID,
Integer.class);
+ eventSelection =
parameters.extractor().selectedSingleValue(EVENT_SELECTION_ID, String.class);
+
+ this.lastValue = false;
+ this.delayCount = 0;
+ this.resultEvents = new ArrayList<>();
+ this.edgeDetected = false;
+ }
+
+ @Override
+ public void onEvent(Event inputEvent,
+ SpOutputCollector collector) throws SpRuntimeException {
+ boolean value =
inputEvent.getFieldBySelector(this.booleanSignalField).getAsPrimitive().getAsBoolean();
+
+ // Detect edges in signal
+ if (detectEdge(value, lastValue)) {
+ this.edgeDetected = true;
+ this.resultEvents = new ArrayList<>();
+ this.delayCount = 0;
+ }
+
+ if (edgeDetected) {
+ // Buffer event(s) according to user configuration
+ addResultEvent(inputEvent);
+
+ // Detect if the delay has been waited for
+ if (this.delay == delayCount) {
+ for (Event event : this.resultEvents) {
+ collector.collect(event);
+ }
+
+ this.edgeDetected = false;
+
+ } else {
+ this.delayCount++;
+ }
+ }
+
+ this.lastValue = value;
+ }
+
+ @Override
+ public void onDetach() throws SpRuntimeException {
+
+ }
+
+ private boolean detectEdge(boolean value, boolean lastValue) {
+ if (this.flank.equals(SignalEdgeFilterProcessor.FLANK_UP)) {
+ return !lastValue && value;
+ } else if (this.flank.equals(SignalEdgeFilterProcessor.FLANK_DOWN)) {
+ return lastValue && !value;
+ } else if (this.flank.equals(SignalEdgeFilterProcessor.BOTH)) {
+ return value != lastValue;
+ }
+
+ return false;
+ }
+
+ private void addResultEvent(Event event) {
+ if (this.eventSelection.equals(SignalEdgeFilterProcessor.OPTION_FIRST)) {
+ if (this.resultEvents.size() == 0) {
+ this.resultEvents.add(event);
+ }
+ } else if
(this.eventSelection.equals(SignalEdgeFilterProcessor.OPTION_LAST)) {
+ this.resultEvents = new ArrayList<>();
+ this.resultEvents.add(event);
+ } else if
(this.eventSelection.equals(SignalEdgeFilterProcessor.OPTION_ALL)) {
+ this.resultEvents.add(event);
+ }
+ }
+}