This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 1608-countarray
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1608-countarray by this push:
     new 959d87090 Migrate count array, split array and signal edge filters 
(#1608)(#1609)(#1610)
959d87090 is described below

commit 959d870900cd313d071bce029495f938f047b8cf
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri May 26 14:40:43 2023 +0200

    Migrate count array, split array and signal edge filters 
(#1608)(#1609)(#1610)
---
 .../transformation/jvm/TransformationJvmInit.java  |   8 +-
 .../jvm/processor/array/count/CountArray.java      |  59 --------
 .../array/count/CountArrayParameters.java          |  37 -----
 ...rayController.java => CountArrayProcessor.java} |  39 +++--
 .../jvm/processor/array/split/SplitArray.java      |  86 -----------
 .../array/split/SplitArrayController.java          |  73 ++++++++--
 .../array/split/SplitArrayParameters.java          |  43 ------
 .../booloperator/edge/SignalEdgeFilter.java        | 120 ---------------
 .../edge/SignalEdgeFilterController.java           |  85 -----------
 .../edge/SignalEdgeFilterParameters.java           |  70 ---------
 .../edge/SignalEdgeFilterProcessor.java            | 161 +++++++++++++++++++++
 11 files changed, 253 insertions(+), 528 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 14c357eff..0d107ea0d 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -27,10 +27,10 @@ import 
org.apache.streampipes.extensions.management.model.SpServiceDefinitionBui
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import 
org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayController;
+import 
org.apache.streampipes.processors.transformation.jvm.processor.array.count.CountArrayProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.array.split.SplitArrayController;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.counter.BooleanCounterProcessor;
-import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge.SignalEdgeFilterController;
+import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge.SignalEdgeFilterProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.inverter.BooleanInverterProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.logical.BooleanOperatorProcessor;
 import 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.state.BooleanToStateController;
@@ -65,7 +65,7 @@ public class TransformationJvmInit extends 
ExtensionsModelSubmitter {
             "",
             8090)
         .registerPipelineElements(
-            new CountArrayController(),
+            new CountArrayProcessor(),
             new SplitArrayController(),
             new CalculateDurationController(),
             new ChangedValueDetectionController(),
@@ -81,7 +81,7 @@ public class TransformationJvmInit extends 
ExtensionsModelSubmitter {
             new TaskDurationController(),
             new TransformToBooleanController(),
             new StringTimerProcessor(),
-            new SignalEdgeFilterController(),
+            new SignalEdgeFilterProcessor(),
             new BooleanToStateController(),
             new NumberLabelerController(),
             new StringToStateProcessor(),
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArray.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArray.java
deleted file mode 100644
index 7252073e5..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArray.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.array.count;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.AbstractField;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.List;
-
-public class CountArray implements EventProcessor<CountArrayParameters> {
-
-  private static Logger log;
-
-  private CountArrayParameters splitArrayParameters;
-
-  @Override
-  public void onInvocation(CountArrayParameters params, SpOutputCollector 
spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) {
-    log = params.getGraph().getLogger(CountArray.class);
-
-    this.splitArrayParameters = params;
-  }
-
-  @Override
-  public void onEvent(Event event, SpOutputCollector out) {
-    String arrayField = splitArrayParameters.getArrayField();
-
-    List<AbstractField> allEvents = 
event.getFieldBySelector(arrayField).getAsList().getRawValue();
-
-    event.addField(CountArrayController.COUNT_NAME, allEvents.size());
-
-    out.collect(event);
-  }
-
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayParameters.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayParameters.java
deleted file mode 100644
index c80918f39..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayParameters.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.array.count;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class CountArrayParameters extends EventProcessorBindingParams {
-  private String arrayField;
-
-
-  public CountArrayParameters(DataProcessorInvocation graph, String 
arrayField) {
-    super(graph);
-    this.arrayField = arrayField;
-  }
-
-  public String getArrayField() {
-    return arrayField;
-  }
-
-}
\ No newline at end of file
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayController.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
similarity index 65%
rename from 
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayController.java
rename to 
streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
index 1e97d7870..62b2818d4 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayController.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java
@@ -18,13 +18,14 @@
 
 package 
org.apache.streampipes.processors.transformation.jvm.processor.array.count;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.AbstractField;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.EpProperties;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
@@ -32,14 +33,20 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.vocabulary.SO;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
-public class CountArrayController extends 
StandaloneEventProcessingDeclarer<CountArrayParameters> {
+import java.util.List;
+
+public class CountArrayProcessor extends StreamPipesDataProcessor {
 
   public static final String COUNT_NAME = "countValue";
   public static final String ARRAY_FIELD = "array-field";
 
+  private String arrayField;
+
   @Override
   public DataProcessorDescription declareModel() {
     return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.count-array")
@@ -57,12 +64,24 @@ public class CountArrayController extends 
StandaloneEventProcessingDeclarer<Coun
   }
 
   @Override
-  public ConfiguredEventProcessor<CountArrayParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                     
ProcessingElementParameterExtractor extractor) {
-    String arrayField = extractor.mappingPropertyValue(ARRAY_FIELD);
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    this.arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD);
+  }
+
+  @Override
+  public void onEvent(Event event, SpOutputCollector collector) throws 
SpRuntimeException {
 
-    CountArrayParameters params = new CountArrayParameters(graph, arrayField);
-    return new ConfiguredEventProcessor<>(params, CountArray::new);
+    List<AbstractField> allEvents = 
event.getFieldBySelector(arrayField).getAsList().getRawValue();
+
+    event.addField(CountArrayProcessor.COUNT_NAME, allEvents.size());
+
+    collector.collect(event);
   }
 
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArray.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArray.java
deleted file mode 100644
index fc61de3de..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArray.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.array.split;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.runtime.field.AbstractField;
-import org.apache.streampipes.model.runtime.field.ListField;
-import org.apache.streampipes.model.runtime.field.NestedField;
-import org.apache.streampipes.model.runtime.field.PrimitiveField;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.List;
-import java.util.Map;
-
-public class SplitArray implements EventProcessor<SplitArrayParameters> {
-
-  private static Logger log;
-
-  private SplitArrayParameters splitArrayParameters;
-
-
-  @Override
-  public void onInvocation(SplitArrayParameters splitArrayParameters, 
SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) {
-    log = splitArrayParameters.getGraph().getLogger(SplitArray.class);
-    this.splitArrayParameters = splitArrayParameters;
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-    String arrayField = splitArrayParameters.getArrayField();
-    List<String> keepProperties = splitArrayParameters.getKeepProperties();
-
-    List<AbstractField> allEvents = 
inputEvent.getFieldBySelector(arrayField).getAsList()
-        .parseAsCustomType(o -> {
-          if (o instanceof NestedField) {
-            return (NestedField) o;
-          } else if (o instanceof ListField) {
-            return (ListField) o;
-          } else {
-            return (PrimitiveField) o;
-          }
-        });
-
-    for (AbstractField field : allEvents) {
-      Event outEvent = new Event();
-      if (field instanceof NestedField) {
-        for (Map.Entry<String, AbstractField> key : ((NestedField) 
field).getRawValue().entrySet()) {
-          outEvent.addField(key.getValue());
-        }
-      } else {
-        outEvent.addField(SplitArrayController.VALUE, field);
-      }
-
-      for (String propertyName : keepProperties) {
-        outEvent.addField(inputEvent.getFieldBySelector(propertyName));
-      }
-
-      out.collect(outEvent);
-    }
-
-  }
-
-  @Override
-  public void onDetach() {
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayController.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayController.java
index e94fdf480..5fb138465 100644
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayController.java
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayController.java
@@ -23,6 +23,10 @@ import 
org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOu
 import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.runtime.field.AbstractField;
+import org.apache.streampipes.model.runtime.field.ListField;
+import org.apache.streampipes.model.runtime.field.NestedField;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyList;
 import org.apache.streampipes.model.schema.EventSchema;
@@ -35,19 +39,25 @@ import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
-public class SplitArrayController extends 
StandaloneEventProcessingDeclarer<SplitArrayParameters>
+public class SplitArrayController extends StreamPipesDataProcessor
     implements 
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, 
ProcessingElementParameterExtractor> {
 
   public static final String KEEP_PROPERTIES_ID = "keep";
   public static final String ARRAY_FIELD_ID = "array-field";
   public static final String VALUE = "array_value";
 
+  private String arrayField;
+  private List<String> keepProperties;
+
   @Override
   public DataProcessorDescription declareModel() {
     return 
ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.split-array")
@@ -66,17 +76,6 @@ public class SplitArrayController extends 
StandaloneEventProcessingDeclarer<Spli
         .build();
   }
 
-  @Override
-  public ConfiguredEventProcessor<SplitArrayParameters> 
onInvocation(DataProcessorInvocation graph,
-                                                                     
ProcessingElementParameterExtractor extractor) {
-
-    String arrayField = extractor.mappingPropertyValue(ARRAY_FIELD_ID);
-    List<String> keepProperties = 
extractor.mappingPropertyValues(KEEP_PROPERTIES_ID);
-
-    SplitArrayParameters params = new SplitArrayParameters(graph, arrayField, 
keepProperties);
-    return new ConfiguredEventProcessor<>(params, SplitArray::new);
-  }
-
   @Override
   public EventSchema resolveOutputStrategy(DataProcessorInvocation 
processingElement,
                                            ProcessingElementParameterExtractor 
extractor)
@@ -98,4 +97,50 @@ public class SplitArrayController extends 
StandaloneEventProcessingDeclarer<Spli
 
     return new EventSchema(outProperties);
   }
+
+  @Override
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD_ID);
+    keepProperties = 
parameters.extractor().mappingPropertyValues(KEEP_PROPERTIES_ID);
+  }
+
+  @Override
+  public void onEvent(Event event,
+                      SpOutputCollector collector) throws SpRuntimeException {
+
+    List<AbstractField> allEvents = 
event.getFieldBySelector(arrayField).getAsList()
+        .parseAsCustomType(o -> {
+          if (o instanceof NestedField) {
+            return o;
+          } else if (o instanceof ListField) {
+            return o;
+          } else {
+            return o;
+          }
+        });
+
+    for (AbstractField field : allEvents) {
+      Event outEvent = new Event();
+      if (field instanceof NestedField) {
+        for (Map.Entry<String, AbstractField> key : ((NestedField) 
field).getRawValue().entrySet()) {
+          outEvent.addField(key.getValue());
+        }
+      } else {
+        outEvent.addField(SplitArrayController.VALUE, field);
+      }
+
+      for (String propertyName : keepProperties) {
+        outEvent.addField(event.getFieldBySelector(propertyName));
+      }
+
+      collector.collect(outEvent);
+    }
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayParameters.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayParameters.java
deleted file mode 100644
index 1ddd075e7..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayParameters.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.array.split;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-import java.util.List;
-
-public class SplitArrayParameters extends EventProcessorBindingParams {
-  private String arrayField;
-  private List<String> keepProperties;
-
-  public SplitArrayParameters(DataProcessorInvocation graph, String 
arrayField, List<String> keepProperties) {
-    super(graph);
-    this.arrayField = arrayField;
-    this.keepProperties = keepProperties;
-  }
-
-  public String getArrayField() {
-    return arrayField;
-  }
-
-  public List<String> getKeepProperties() {
-    return keepProperties;
-  }
-}
\ No newline at end of file
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java
deleted file mode 100644
index f37131d50..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilter.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
-
-import org.apache.streampipes.logging.api.Logger;
-import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
-import org.apache.streampipes.wrapper.routing.SpOutputCollector;
-import org.apache.streampipes.wrapper.runtime.EventProcessor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class SignalEdgeFilter implements 
EventProcessor<SignalEdgeFilterParameters> {
-
-  private static Logger log;
-
-  private String booleanSignalField;
-  private String flank;
-  private Integer delay;
-  private String eventSelection;
-
-  private boolean lastValue;
-  private int delayCount;
-  private List<Event> resultEvents;
-  private boolean edgeDetected;
-
-  @Override
-  public void onInvocation(SignalEdgeFilterParameters 
booleanInverterParameters,
-                           SpOutputCollector spOutputCollector,
-                           EventProcessorRuntimeContext runtimeContext) {
-    log = 
booleanInverterParameters.getGraph().getLogger(SignalEdgeFilter.class);
-    this.booleanSignalField = 
booleanInverterParameters.getBooleanSignalField();
-    this.flank = booleanInverterParameters.getFlank();
-    this.delay = booleanInverterParameters.getDelay();
-    this.eventSelection = booleanInverterParameters.getEventSelection();
-
-    this.lastValue = false;
-    this.delayCount = 0;
-    this.resultEvents = new ArrayList<>();
-    this.edgeDetected = false;
-  }
-
-  @Override
-  public void onEvent(Event inputEvent, SpOutputCollector out) {
-
-    boolean value = 
inputEvent.getFieldBySelector(this.booleanSignalField).getAsPrimitive().getAsBoolean();
-
-    // Detect edges in signal
-    if (detectEdge(value, lastValue)) {
-      this.edgeDetected = true;
-      this.resultEvents = new ArrayList<>();
-      this.delayCount = 0;
-    }
-
-    if (edgeDetected) {
-      // Buffer event(s) according to user configuration
-      addResultEvent(inputEvent);
-
-      // Detect if the delay has been waited for
-      if (this.delay == delayCount) {
-        for (Event event : this.resultEvents) {
-          out.collect(event);
-        }
-
-        this.edgeDetected = false;
-
-      } else {
-        this.delayCount++;
-      }
-    }
-
-    this.lastValue = value;
-  }
-
-  @Override
-  public void onDetach() {
-  }
-
-  private boolean detectEdge(boolean value, boolean lastValue) {
-    if (this.flank.equals(SignalEdgeFilterController.FLANK_UP)) {
-      return !lastValue && value;
-    } else if (this.flank.equals(SignalEdgeFilterController.FLANK_DOWN)) {
-      return lastValue && !value;
-    } else if (this.flank.equals(SignalEdgeFilterController.BOTH)) {
-      return value != lastValue;
-    }
-
-    return false;
-  }
-
-  private void addResultEvent(Event event) {
-    if (this.eventSelection.equals(SignalEdgeFilterController.OPTION_FIRST)) {
-      if (this.resultEvents.size() == 0) {
-        this.resultEvents.add(event);
-      }
-    } else if 
(this.eventSelection.equals(SignalEdgeFilterController.OPTION_LAST)) {
-      this.resultEvents = new ArrayList<>();
-      this.resultEvents.add(event);
-    } else if 
(this.eventSelection.equals(SignalEdgeFilterController.OPTION_ALL)) {
-      this.resultEvents.add(event);
-    }
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java
deleted file mode 100644
index 405d24a6f..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterController.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
-
-import org.apache.streampipes.model.DataProcessorType;
-import org.apache.streampipes.model.graph.DataProcessorDescription;
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import org.apache.streampipes.model.schema.PropertyScope;
-import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
-import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
-import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
-import org.apache.streampipes.sdk.helpers.EpRequirements;
-import org.apache.streampipes.sdk.helpers.Labels;
-import org.apache.streampipes.sdk.helpers.Locales;
-import org.apache.streampipes.sdk.helpers.Options;
-import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
-import 
org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
-
-public class SignalEdgeFilterController extends 
StandaloneEventProcessingDeclarer<SignalEdgeFilterParameters> {
-
-  public static final String BOOLEAN_SIGNAL_FIELD = "boolean_signal_field";
-  public static final String FLANK_ID = "flank";
-  public static final String DELAY_ID = "delay";
-  private static final String EVENT_SELECTION_ID = "event-selection-id";
-
-  public static final String FLANK_UP = "FALSE -> TRUE";
-  public static final String FLANK_DOWN = "TRUE -> FALSE";
-  public static final String BOTH = "BOTH";
-  public static final String OPTION_FIRST = "First";
-  public static final String OPTION_LAST = "Last";
-  public static final String OPTION_ALL = "All";
-
-  @Override
-  public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create(
-            
"org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge")
-        .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.FILTER)
-        .withLocales(Locales.EN)
-        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-        .requiredStream(StreamRequirementsBuilder.create()
-            .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(), 
Labels.withId(BOOLEAN_SIGNAL_FIELD),
-                PropertyScope.NONE)
-            .build())
-        .requiredSingleValueSelection(Labels.withId(FLANK_ID), 
Options.from(BOTH, FLANK_UP, FLANK_DOWN))
-        .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
-        .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
-            Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
-        .outputStrategy(OutputStrategies.keep())
-        .build();
-  }
-
-  @Override
-  public ConfiguredEventProcessor<SignalEdgeFilterParameters> onInvocation(
-      DataProcessorInvocation graph,
-      ProcessingElementParameterExtractor extractor) {
-
-    String booleanSignalField = 
extractor.mappingPropertyValue(BOOLEAN_SIGNAL_FIELD);
-    String flank = extractor.selectedSingleValue(FLANK_ID, String.class);
-    Integer delay = extractor.singleValueParameter(DELAY_ID, Integer.class);
-    String eventSelection = extractor.selectedSingleValue(EVENT_SELECTION_ID, 
String.class);
-
-    SignalEdgeFilterParameters params =
-        new SignalEdgeFilterParameters(graph, booleanSignalField, flank, 
delay, eventSelection);
-
-    return new ConfiguredEventProcessor<>(params, SignalEdgeFilter::new);
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java
deleted file mode 100644
index c766ab68a..000000000
--- 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterParameters.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
-
-import org.apache.streampipes.model.graph.DataProcessorInvocation;
-import 
org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
-
-public class SignalEdgeFilterParameters extends EventProcessorBindingParams {
-  private String booleanSignalField;
-  private String flank;
-  private Integer delay;
-  private String eventSelection;
-
-  public SignalEdgeFilterParameters(DataProcessorInvocation graph, String 
booleanSignalField, String flank,
-                                    Integer delay, String eventSelection) {
-    super(graph);
-    this.booleanSignalField = booleanSignalField;
-    this.flank = flank;
-    this.delay = delay;
-    this.eventSelection = eventSelection;
-  }
-
-  public String getBooleanSignalField() {
-    return booleanSignalField;
-  }
-
-  public void setBooleanSignalField(String booleanSignalField) {
-    this.booleanSignalField = booleanSignalField;
-  }
-
-  public String getFlank() {
-    return flank;
-  }
-
-  public void setFlank(String flank) {
-    this.flank = flank;
-  }
-
-  public Integer getDelay() {
-    return delay;
-  }
-
-  public void setDelay(Integer delay) {
-    this.delay = delay;
-  }
-
-  public String getEventSelection() {
-    return eventSelection;
-  }
-
-  public void setEventSelection(String eventSelection) {
-    this.eventSelection = eventSelection;
-  }
-}
diff --git 
a/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
new file mode 100644
index 000000000..3df28ad08
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/booloperator/edge/SignalEdgeFilterProcessor.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package 
org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.Options;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.standalone.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SignalEdgeFilterProcessor extends StreamPipesDataProcessor {
+
+  public static final String BOOLEAN_SIGNAL_FIELD = "boolean_signal_field";
+  public static final String FLANK_ID = "flank";
+  public static final String DELAY_ID = "delay";
+  private static final String EVENT_SELECTION_ID = "event-selection-id";
+
+  public static final String FLANK_UP = "FALSE -> TRUE";
+  public static final String FLANK_DOWN = "TRUE -> FALSE";
+  public static final String BOTH = "BOTH";
+  public static final String OPTION_FIRST = "First";
+  public static final String OPTION_LAST = "Last";
+  public static final String OPTION_ALL = "All";
+
+  private String booleanSignalField;
+  private String flank;
+  private Integer delay;
+  private String eventSelection;
+
+  private boolean lastValue;
+  private int delayCount;
+  private List<Event> resultEvents;
+  private boolean edgeDetected;
+
+  @Override
+  public DataProcessorDescription declareModel() {
+    return ProcessingElementBuilder.create(
+            
"org.apache.streampipes.processors.transformation.jvm.processor.booloperator.edge")
+        .category(DataProcessorType.BOOLEAN_OPERATOR, DataProcessorType.FILTER)
+        .withLocales(Locales.EN)
+        .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+        .requiredStream(StreamRequirementsBuilder.create()
+            .requiredPropertyWithUnaryMapping(EpRequirements.booleanReq(), 
Labels.withId(BOOLEAN_SIGNAL_FIELD),
+                PropertyScope.NONE)
+            .build())
+        .requiredSingleValueSelection(Labels.withId(FLANK_ID), 
Options.from(BOTH, FLANK_UP, FLANK_DOWN))
+        .requiredIntegerParameter(Labels.withId(DELAY_ID), 0)
+        .requiredSingleValueSelection(Labels.withId(EVENT_SELECTION_ID),
+            Options.from(OPTION_FIRST, OPTION_LAST, OPTION_ALL))
+        .outputStrategy(OutputStrategies.keep())
+        .build();
+  }
+
+  @Override
+  public void onInvocation(ProcessorParams parameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws 
SpRuntimeException {
+    booleanSignalField = 
parameters.extractor().mappingPropertyValue(BOOLEAN_SIGNAL_FIELD);
+    flank = parameters.extractor().selectedSingleValue(FLANK_ID, String.class);
+    delay = parameters.extractor().singleValueParameter(DELAY_ID, 
Integer.class);
+    eventSelection = 
parameters.extractor().selectedSingleValue(EVENT_SELECTION_ID, String.class);
+
+    this.lastValue = false;
+    this.delayCount = 0;
+    this.resultEvents = new ArrayList<>();
+    this.edgeDetected = false;
+  }
+
+  @Override
+  public void onEvent(Event inputEvent,
+                      SpOutputCollector collector) throws SpRuntimeException {
+    boolean value = 
inputEvent.getFieldBySelector(this.booleanSignalField).getAsPrimitive().getAsBoolean();
+
+    // Detect edges in signal
+    if (detectEdge(value, lastValue)) {
+      this.edgeDetected = true;
+      this.resultEvents = new ArrayList<>();
+      this.delayCount = 0;
+    }
+
+    if (edgeDetected) {
+      // Buffer event(s) according to user configuration
+      addResultEvent(inputEvent);
+
+      // Detect if the delay has been waited for
+      if (this.delay == delayCount) {
+        for (Event event : this.resultEvents) {
+          collector.collect(event);
+        }
+
+        this.edgeDetected = false;
+
+      } else {
+        this.delayCount++;
+      }
+    }
+
+    this.lastValue = value;
+  }
+
+  @Override
+  public void onDetach() throws SpRuntimeException {
+
+  }
+
+  private boolean detectEdge(boolean value, boolean lastValue) {
+    if (this.flank.equals(SignalEdgeFilterProcessor.FLANK_UP)) {
+      return !lastValue && value;
+    } else if (this.flank.equals(SignalEdgeFilterProcessor.FLANK_DOWN)) {
+      return lastValue && !value;
+    } else if (this.flank.equals(SignalEdgeFilterProcessor.BOTH)) {
+      return value != lastValue;
+    }
+
+    return false;
+  }
+
+  private void addResultEvent(Event event) {
+    if (this.eventSelection.equals(SignalEdgeFilterProcessor.OPTION_FIRST)) {
+      if (this.resultEvents.size() == 0) {
+        this.resultEvents.add(event);
+      }
+    } else if 
(this.eventSelection.equals(SignalEdgeFilterProcessor.OPTION_LAST)) {
+      this.resultEvents = new ArrayList<>();
+      this.resultEvents.add(event);
+    } else if 
(this.eventSelection.equals(SignalEdgeFilterProcessor.OPTION_ALL)) {
+      this.resultEvents.add(event);
+    }
+  }
+}


Reply via email to