This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch 3179-new-processor-sensorlimitalert
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3179-new-processor-sensorlimitalert by this push:
new aa667c45f2 feat(#3179): Add processor sensor limit alert
aa667c45f2 is described below
commit aa667c45f2c2ce5b1f7c4f57252edae0f2c9fb67
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Aug 23 18:08:01 2024 +0200
feat(#3179): Add processor sensor limit alert
---
.../jvm/EnricherExtensionModuleExport.java | 2 +
.../limitsalert/SensorLimitAlertProcessor.java | 168 +++++++++++++++++++++
.../documentation.md | 107 +++++++++++++
.../icon.png | Bin 0 -> 19784 bytes
.../strings.en | 40 +++++
.../limitsalert/SensorLimitAlertProcessorTest.java | 141 +++++++++++++++++
6 files changed, 458 insertions(+)
diff --git
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java
index 7685b986aa..f26795f7ac 100644
---
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java
+++
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherExtensionModuleExport.java
@@ -23,6 +23,7 @@ import
org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
import org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import
org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalProcessor;
+import
org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.SensorLimitAlertProcessor;
import
org.apache.streampipes.processors.enricher.jvm.processor.limitsenrichment.QualityControlLimitsEnrichmentProcessor;
import
org.apache.streampipes.processors.enricher.jvm.processor.math.MathOpProcessor;
import
org.apache.streampipes.processors.enricher.jvm.processor.math.staticmathop.StaticMathOpProcessor;
@@ -43,6 +44,7 @@ public class EnricherExtensionModuleExport implements
IExtensionModuleExport {
return List.of(
new JSEvalProcessor(),
new QualityControlLimitsEnrichmentProcessor(),
+ new SensorLimitAlertProcessor(),
new MathOpProcessor(),
new StaticMathOpProcessor(),
new TrigonometryProcessor(),
diff --git
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsalert/SensorLimitAlertProcessor.java
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsalert/SensorLimitAlertProcessor.java
new file mode 100644
index 0000000000..cb6b16e70a
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsalert/SensorLimitAlertProcessor.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.enricher.jvm.processor.limitsalert;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import
org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
+import org.apache.streampipes.model.DataProcessorType;
+import org.apache.streampipes.model.extensions.ExtensionAssetType;
+import org.apache.streampipes.model.graph.DataProcessorDescription;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
+import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
+import org.apache.streampipes.sdk.helpers.EpProperties;
+import org.apache.streampipes.sdk.helpers.EpRequirements;
+import org.apache.streampipes.sdk.helpers.Labels;
+import org.apache.streampipes.sdk.helpers.Locales;
+import org.apache.streampipes.sdk.helpers.OutputStrategies;
+import org.apache.streampipes.vocabulary.SO;
+import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
+import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
+
+public class SensorLimitAlertProcessor extends StreamPipesDataProcessor {
+
+ protected static final String SENSOR_VALUE_LABEL = "sensorValue";
+ protected static final String UPPER_CONTROL_LIMIT_LABEL =
"upperControlLimit";
+ protected static final String UPPER_WARNING_LIMIT_LABEL =
"upperWarningLimit";
+ protected static final String LOWER_WARNING_LIMIT_LABEL =
"lowerWarningLimit";
+ protected static final String LOWER_CONTROL_LIMIT_LABEL =
"lowerControlLimit";
+
+ // Property names that are appended to the resulting event
+ protected static final String ALERT_STATUS = "alertStatus";
+ protected static final String LIMIT_BREACHED = "limitBreached";
+
+ protected static final String ALERT = "ALERT";
+ protected static final String WARNING = "WARNING";
+ protected static final String UPPER_LIMIT = "UPPER_LIMIT";
+ protected static final String LOWER_LIMIT = "LOWER_LIMIT";
+
+ private String sensorField;
+ private String upperControlLimitField;
+ private String upperWarningLimitField;
+ private String lowerWarningLimitField;
+ private String lowerControlLimitField;
+
+ @Override
+ public DataProcessorDescription declareModel() {
+ return ProcessingElementBuilder
+
.create("org.apache.streampipes.processors.enricher.jvm.processor.limitsalert",
0)
+ .category(DataProcessorType.ENRICH)
+ .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
+ .withLocales(Locales.EN)
+ .requiredStream(StreamRequirementsBuilder
+ .create()
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(SENSOR_VALUE_LABEL),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(UPPER_CONTROL_LIMIT_LABEL),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(UPPER_WARNING_LIMIT_LABEL),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(LOWER_WARNING_LIMIT_LABEL),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .requiredPropertyWithUnaryMapping(
+ EpRequirements.numberReq(),
+ Labels.withId(LOWER_CONTROL_LIMIT_LABEL),
+ PropertyScope.MEASUREMENT_PROPERTY
+ )
+ .build())
+ .outputStrategy(
+ OutputStrategies.append(
+ EpProperties.stringEp(Labels.empty(), ALERT_STATUS, SO.TEXT),
+ EpProperties.stringEp(Labels.empty(), LIMIT_BREACHED, SO.TEXT)
+ ))
+ .build();
+ }
+
+ @Override
+ public void onInvocation(
+ ProcessorParams parameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext
+ ) throws SpRuntimeException {
+ var extractor = parameters.extractor();
+
+ sensorField = extractor.mappingPropertyValue(SENSOR_VALUE_LABEL);
+ upperControlLimitField =
extractor.mappingPropertyValue(UPPER_CONTROL_LIMIT_LABEL);
+ upperWarningLimitField =
extractor.mappingPropertyValue(UPPER_WARNING_LIMIT_LABEL);
+ lowerWarningLimitField =
extractor.mappingPropertyValue(LOWER_WARNING_LIMIT_LABEL);
+ lowerControlLimitField =
extractor.mappingPropertyValue(LOWER_CONTROL_LIMIT_LABEL);
+ }
+
+ @Override
+ public void onDetach() {
+
+ }
+
+ @Override
+ public void onEvent(Event event, SpOutputCollector collector) {
+ var sensorValue = event.getFieldBySelector(sensorField)
+ .getAsPrimitive()
+ .getAsDouble();
+ var upperControlLimit = event.getFieldBySelector(upperControlLimitField)
+ .getAsPrimitive()
+ .getAsDouble();
+ var upperWarningLimit = event.getFieldBySelector(upperWarningLimitField)
+ .getAsPrimitive()
+ .getAsDouble();
+ var lowerWarningLimit = event.getFieldBySelector(lowerWarningLimitField)
+ .getAsPrimitive()
+ .getAsDouble();
+ var lowerControlLimit = event.getFieldBySelector(lowerControlLimitField)
+ .getAsPrimitive()
+ .getAsDouble();
+
+ String alertStatus = null;
+ String limitBreached = null;
+
+ if (sensorValue > upperControlLimit) {
+ alertStatus = ALERT;
+ limitBreached = UPPER_LIMIT;
+ } else if (sensorValue > upperWarningLimit) {
+ alertStatus = WARNING;
+ limitBreached = UPPER_LIMIT;
+ } else if (sensorValue < lowerControlLimit) {
+ alertStatus = ALERT;
+ limitBreached = LOWER_LIMIT;
+ } else if (sensorValue < lowerWarningLimit) {
+ alertStatus = WARNING;
+ limitBreached = LOWER_LIMIT;
+ }
+
+ if (alertStatus != null && limitBreached != null) {
+ event.addField(ALERT_STATUS, alertStatus);
+ event.addField(LIMIT_BREACHED, limitBreached);
+ collector.collect(event);
+ }
+
+ }
+}
diff --git
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/documentation.md
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/documentation.md
new file mode 100644
index 0000000000..2f9d7b9672
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/documentation.md
@@ -0,0 +1,107 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+## Sensor Limit Alert
+
+<p align="center">
+ <img src="icon.png" width="150px;" class="pe-image-documentation"/>
+</p>
+
+***
+
+## Description
+
+The Sensor Limit Alert processor monitors sensor values in real-time and
triggers alerts when these values exceed user-defined control or warning
limits. This processor is useful in scenarios where continuous monitoring of
critical parameters is required, and immediate action is needed when values go
out of acceptable ranges.
+
+***
+
+## Required Input
+
+This processor accepts any event stream containing sensor data. The events
must include fields for sensor values and the corresponding upper and lower
limits.
+
+***
+
+## Configuration
+
+#### Sensor Value
+
+Select the sensor value to be monitored. This is the primary measurement that
will be checked against the defined limits.
+
+#### Upper Control Limit
+
+Specify the upper control limit for the sensor. This value defines the maximum
threshold, beyond which an alert is triggered.
+
+#### Upper Warning Limit
+
+Specify the upper warning limit for the sensor. This value indicates when the
sensor value is approaching the upper control limit, triggering a warning.
+
+#### Lower Warning Limit
+
+Specify the lower warning limit for the sensor. This value indicates when the
sensor value is approaching the lower control limit, triggering a warning.
+
+#### Lower Control Limit
+
+Specify the lower control limit for the sensor. This value defines the minimum
threshold, below which an alert is triggered.
+
+***
+
+## Output
+
+The processor emits events only when the sensor value exceeds the specified
limits. The output event includes the original sensor data along with
additional fields that indicate:
+- **Alert Status**: Whether the sensor value breached a WARNING or control
LIMIT.
+- **Limit Breached**: Which specific limit was breached (e.g.,
"UPPER_CONTROL_LIMIT" or "LOWER_WARNING_LIMIT").
+
+These output events can be used for triggering notifications or other actions
in downstream processing.
+
+***
+
+## Example
+
+### User Configuration
+- Mapping fields for:
+ - **Sensor Value**
+ - **Upper Control Limit**
+ - **Upper Warning Limit**
+ - **Lower Warning Limit**
+ - **Lower Control Limit**
+
+### Input Event
+```
+{
+ "timestamp": 1627891234000,
+ "sensorValue": 105.0,
+ "upperControlLimit": 100.0,
+ "upperWarningLimit": 90.0,
+ "lowerWarningLimit": 10.0,
+ "lowerControlLimit": 0.0
+}
+```
+
+### Output Event
+```
+{
+ "timestamp": 1627891234000,
+ "sensorValue": 105.0,
+ "upperControlLimit": 100.0,
+ "upperWarningLimit": 90.0,
+ "lowerWarningLimit": 10.0,
+ "lowerControlLimit": 0.0,
+ "alertStatus": "ALERT",
+ "limitBreached": "UPPER_CONTROL_LIMIT"
+}
+```
\ No newline at end of file
diff --git
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/icon.png
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/icon.png
new file mode 100644
index 0000000000..c53a42a585
Binary files /dev/null and
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/icon.png
differ
diff --git
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/strings.en
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/strings.en
new file mode 100644
index 0000000000..f7f0c4449e
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.processor.limitsalert/strings.en
@@ -0,0 +1,40 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.title=Sensor
Limit Alert
+org.apache.streampipes.processors.enricher.jvm.processor.limitsalert.description=Monitors
sensor values and triggers alerts when they exceed defined control or warning
limits.
+
+sensorValue.title=Sensor Value
+sensorValue.description=Select the sensor value to be monitored.
+
+upperControlLimit.title=Upper Control Limit
+upperControlLimit.description=Set the upper control limit for the sensor value.
+
+upperWarningLimit.title=Upper Warning Limit
+upperWarningLimit.description=Set the upper warning limit for the sensor value.
+
+lowerWarningLimit.title=Lower Warning Limit
+lowerWarningLimit.description=Set the lower warning limit for the sensor value.
+
+lowerControlLimit.title=Lower Control Limit
+lowerControlLimit.description=Set the lower control limit for the sensor value.
+
+alertStatus.title=Alert Status
+alertStatus.description=Indicates whether the sensor value has breached a
warning or control limit.
+
+limitBreached.title=Limit Breached
+limitBreached.description=Specifies which limit (e.g., Upper Control Limit,
Lower Warning Limit) was breached by the sensor value.
\ No newline at end of file
diff --git
a/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsalert/SensorLimitAlertProcessorTest.java
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsalert/SensorLimitAlertProcessorTest.java
new file mode 100644
index 0000000000..3741a11c3f
--- /dev/null
+++
b/streampipes-extensions/streampipes-processors-enricher-jvm/src/test/java/org/apache/streampipes/processors/enricher/jvm/processor/limitsalert/SensorLimitAlertProcessorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.enricher.jvm.processor.limitsalert;
+
+import org.apache.streampipes.test.executors.ProcessingElementTestExecutor;
+import org.apache.streampipes.test.executors.TestConfiguration;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+class SensorLimitAlertProcessorTest {
+
+ private final String temperature = "temperature";
+ private final String upperControlLimit = "upperControlLimit";
+ private final String upperWarningLimit = "upperWarningLimit";
+ private final String lowerWarningLimit = "lowerWarningLimit";
+ private final String lowerControlLimit = "lowerControlLimit";
+
+ private final Map<String, Object> baseEvent = Map.of(
+ upperControlLimit, 90.0,
+ upperWarningLimit, 80.0,
+ lowerWarningLimit, 20.0,
+ lowerControlLimit, 10.0
+ );
+
+ private SensorLimitAlertProcessor processor;
+
+ @BeforeEach
+ public void setup() {
+ processor = new SensorLimitAlertProcessor();
+ }
+
+ static Stream<Arguments> arguments() {
+ return Stream.of(
+ Arguments.of(
+ 5.0,
+ SensorLimitAlertProcessor.ALERT,
+ SensorLimitAlertProcessor.LOWER_LIMIT
+ ),
+ Arguments.of(
+ 15.0,
+ SensorLimitAlertProcessor.WARNING,
+ SensorLimitAlertProcessor.LOWER_LIMIT
+ ),
+ Arguments.of(
+ 85.0,
+ SensorLimitAlertProcessor.WARNING,
+ SensorLimitAlertProcessor.UPPER_LIMIT
+ ),
+ Arguments.of(
+ 95.0,
+ SensorLimitAlertProcessor.ALERT,
+ SensorLimitAlertProcessor.UPPER_LIMIT
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("arguments")
+ void onEvent_differentAlertsTest(
+ double sensorValue,
+ String alertValue,
+ String limitBreachedValue
+ ) {
+
+ List<Map<String, Object>> inputEvents = List.of(
+ new HashMap<>(baseEvent) {{
+ put(temperature, sensorValue);
+ }}
+ );
+
+ List<Map<String, Object>> outputEvents = List.of(
+ new HashMap<>(baseEvent) {{
+ put(temperature, sensorValue);
+ put(SensorLimitAlertProcessor.ALERT_STATUS, alertValue);
+ put(SensorLimitAlertProcessor.LIMIT_BREACHED, limitBreachedValue);
+ }}
+ );
+
+ var configuration = getTestConfiguration();
+
+ var testExecutor = new ProcessingElementTestExecutor(processor,
configuration);
+
+ testExecutor.run(inputEvents, outputEvents);
+ }
+
+ @Test
+ void onEvent_noEventIsEmittedIfInValueRangeTest() {
+
+ List<Map<String, Object>> inputEvents = List.of(
+ new HashMap<>(baseEvent) {{
+ put(temperature, 50.0);
+ }}
+ );
+
+ List<Map<String, Object>> outputEvents = List.of();
+
+ var configuration = getTestConfiguration();
+
+ var testExecutor = new ProcessingElementTestExecutor(processor,
configuration);
+
+ testExecutor.run(inputEvents, outputEvents);
+ }
+
+ private TestConfiguration getTestConfiguration() {
+ return TestConfiguration
+ .builder()
+ .configWithDefaultPrefix(SensorLimitAlertProcessor.SENSOR_VALUE_LABEL,
temperature)
+
.configWithDefaultPrefix(SensorLimitAlertProcessor.UPPER_CONTROL_LIMIT_LABEL,
upperControlLimit)
+
.configWithDefaultPrefix(SensorLimitAlertProcessor.UPPER_WARNING_LIMIT_LABEL,
upperWarningLimit)
+
.configWithDefaultPrefix(SensorLimitAlertProcessor.LOWER_WARNING_LIMIT_LABEL,
lowerWarningLimit)
+
.configWithDefaultPrefix(SensorLimitAlertProcessor.LOWER_CONTROL_LIMIT_LABEL,
lowerControlLimit)
+ .build();
+
+ }
+
+}
\ No newline at end of file