heisenbergs-uncertainty opened a new pull request, #3669: URL: https://github.com/apache/streampipes/pull/3669
<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more ~ contributor license agreements. See the NOTICE file distributed with ~ this work for additional information regarding copyright ownership. ~ The ASF licenses this file to You under the Apache License, Version 2.0 ~ (the "License"); you may not use this file except in compliance with ~ the License. You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 ~ ~ Unless required by applicable law or agreed to in writing, software ~ distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the License for the specific language governing permissions and ~ limitations under the License. ~ --> <!-- Thanks for contributing! Here are some tips you can follow to help us incorporate your contribution quickly and easily: 1. If this is your first time, please read our contributor guidelines: - https://streampipes.apache.org/community/get-involved/ - https://cwiki.apache.org/confluence/display/STREAMPIPES/Getting+Started 2. Make sure the PR title is formatted like: `[#<GitHub issue id>] PR title ...` 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., `[WIP][#<GitHub issue id>] PR title ...`. 4. Please write your PR title to summarize what this PR proposes/fixes. 5. Link the PR to the corresponding GitHub issue (if present) in the `Development` section in the right menu bar. 6. Be sure to keep the PR description updated to reflect all changes. 7. If possible, provide a concise example to reproduce the issue for a faster review. 8. Make sure tests pass via `mvn clean install`. 9. (Optional) If the contribution is large, please file an Apache ICLA - http://apache.org/licenses/icla.pdf --> ### Purpose <!-- Please clarify what changes you are proposing and describe how those changes will address the issue. Furthermore, describe potential consequences the changes might have. --> The purpose of this processor is to handle use-cases where you need to execute some kind of logic when a condition is met for some amount of time. My specific use case for this operator stemmed for my teams OEE automation efforts. We are tracking downtime on CNC machines but want to get reason codes from our operators that tell us "why" the machine is down. The problem in CNC downtime is that the machine is often instructed to intentionally drop into a "not running" state for short bursts. In order to not spam our operators when machines drop out of running state for short periods we wanted to only send notifications after x amount of time which would indicate that the machine is down for some unknown reason. This is what led to this processor. The processor is decently configurable and should address many use cases outside of our own. I have also noticed that Streampipes is currently generally lacking in processors that operate on time as a condition which I think is a generally useful use-case. With some more effort and refactoring, the logic implemented in the processor could possibly serve as a based for processors that depend on the timestamp to determine their state. ### Logic of Processor  ### Examples | Configuration | Results | :-------------------------:|:-------------------------: || || | ### Remarks <!-- Is there anything left we need to pay attention on? Are there some references that might be important? E.g. links to Confluence, or discussions on the mailing list or GitHub. --> I would be interested in some feedback on my use of the ScheduledExecutorService and whether this has the potential to introduce any strange behaviour in the pipeline do to it synthetically generating events. I also am again unsure how to unit test this in the repository. I was able to introduce unit tests in my own custom processor repo but that is running against jdk v23 which allows mocking the ScheduledExecutorService which 17 does not. It also introduces a new dependency from mockito-junit-jupiter for the MockitoExtension.class which is another reason why I didn't want to include the unit testing into this original PR until further review. Below are the tests that I are used in my custom component. ```java package com.spmoilandgas.streampipes.conditionduration; import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters; import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector; import org.apache.streampipes.model.graph.DataProcessorInvocation; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.runtime.EventFactory; import org.apache.streampipes.model.runtime.SchemaInfo; import org.apache.streampipes.model.runtime.SourceInfo; import org.apache.streampipes.model.schema.EventSchema; import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) public class ConditionalTimeFilterProcessorTest { // region Constants private static final String FILTER_FIELD_ID = "filterField"; private static final String TIMESTAMP_FIELD_ID = "timestampField"; private static final String DURATION_ID = "duration"; private static final String DURATION_UNIT_ID = "durationUnit"; private static final String CONDITION_VALUE_ID = "conditionValue"; private static final String OUTPUT_MODE_ID = "outputMode"; private static final String TRIGGER_MODE_ID = "triggerMode"; private static final String TRIGGER_MODE_ON_EVENT = "On Event Arrival"; private static final String TRIGGER_MODE_ON_TIMER = "On Timer"; private static final String ORIGINAL_TIMESTAMP_FIELD = "originalTimestamp"; private static final String PROCESSING_TIMESTAMP_FIELD = "processingTimestamp"; private static final String TIME_DIFFERENCE_FIELD = "timeDifference"; // endregion @Mock private SpOutputCollector collector; @Mock private ScheduledExecutorService executorService; @Mock private ScheduledFuture<?> scheduledFuture; // ################################### // ## Tests for "On Event Arrival" ## // ################################### @Test public void onEventArrival_fireOnceOnTrue() { var processor = new ConditionalTimeFilterProcessor(); var eventCaptor = ArgumentCaptor.forClass(Event.class); configureProcessor(processor, 2, "Seconds", "Fire Once", TRIGGER_MODE_ON_EVENT, true); processor.onEvent(makeEvent(true, 10000, "id-1"), collector); processor.onEvent(makeEvent(true, 11000, "id-2"), collector); processor.onEvent(makeEvent(true, 12000, "id-3"), collector); processor.onEvent(makeEvent(true, 13000, "id-4"), collector); verify(collector, times(1)).collect(eventCaptor.capture()); assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 10000L, 12000L, "id-3", true); } @Test public void onEventArrival_fireOnceOnFalse() { var processor = new ConditionalTimeFilterProcessor(); var eventCaptor = ArgumentCaptor.forClass(Event.class); configureProcessor(processor, 2, "Seconds", "Fire Once", TRIGGER_MODE_ON_EVENT, false); processor.onEvent(makeEvent(false, 10000, "id-1"), collector); processor.onEvent(makeEvent(true, 11000, "id-2"), collector); processor.onEvent(makeEvent(false, 12000, "id-3"), collector); processor.onEvent(makeEvent(false, 14000, "id-4"), collector); verify(collector, times(1)).collect(eventCaptor.capture()); assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 12000L, 14000L, "id-4", false); } @Test public void onEventArrival_fireOnceThenResetAndFireAgain() { var processor = new ConditionalTimeFilterProcessor(); var eventCaptor = ArgumentCaptor.forClass(Event.class); configureProcessor(processor, 1, "Seconds", "Fire Once", TRIGGER_MODE_ON_EVENT, true); // First sequence processor.onEvent(makeEvent(true, 10000, "id-1"), collector); processor.onEvent(makeEvent(true, 11000, "id-2"), collector); // Fires verify(collector, times(1)).collect(any(Event.class)); // This event should be ignored processor.onEvent(makeEvent(true, 12000, "id-3"), collector); verify(collector, times(1)).collect(any(Event.class)); // Reset sequence processor.onEvent(makeEvent(false, 13000, "id-4"), collector); // Second sequence processor.onEvent(makeEvent(true, 14000, "id-5"), collector); // New timer starts processor.onEvent(makeEvent(true, 15000, "id-6"), collector); // Fires again verify(collector, times(2)).collect(eventCaptor.capture()); assertEventIsEnrichedCorrectly(eventCaptor.getAllValues().get(1), 14000L, 15000L, "id-6", true); } @Test public void onEventArrival_fireContinuouslyAfterDelay() { var processor = new ConditionalTimeFilterProcessor(); configureProcessor(processor, 10, "Seconds", "Fire Continuously", TRIGGER_MODE_ON_EVENT, true); // Timer starts, but no event is fired processor.onEvent(makeEvent(true, 0, "id-1"), collector); verify(collector, never()).collect(any()); // Still not enough time processor.onEvent(makeEvent(true, 9000, "id-2"), collector); verify(collector, never()).collect(any()); // Duration met, first event is fired processor.onEvent(makeEvent(true, 10000, "id-3"), collector); verify(collector, times(1)).collect(any(Event.class)); // Subsequent true events should be fired immediately processor.onEvent(makeEvent(true, 11000, "id-4"), collector); verify(collector, times(2)).collect(any(Event.class)); processor.onEvent(makeEvent(true, 12000, "id-5"), collector); verify(collector, times(3)).collect(any(Event.class)); } // ########################### // ## Tests for "On Timer" ## // ########################### @Test public void onTimer_fireWithoutSubsequentEvents() { var processor = new ConditionalTimeFilterProcessor(); var taskCaptor = ArgumentCaptor.forClass(Runnable.class); var eventCaptor = ArgumentCaptor.forClass(Event.class); configureProcessor(processor, 1, "Seconds", "Fire Once", TRIGGER_MODE_ON_TIMER, true); processor.executorService = this.executorService; processor.onEvent(makeEvent(true, 10000, "id-1"), collector); verify(executorService).schedule(taskCaptor.capture(), eq(1000L), eq(TimeUnit.MILLISECONDS)); taskCaptor.getValue().run(); verify(collector, times(1)).collect(eventCaptor.capture()); assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 10000L, "id-1", true); } @Test public void onTimer_fireOnceThenResetAndFireAgain() { var processor = new ConditionalTimeFilterProcessor(); var taskCaptor = ArgumentCaptor.forClass(Runnable.class); configureProcessor(processor, 1, "Seconds", "Fire Once", TRIGGER_MODE_ON_TIMER, true); processor.executorService = this.executorService; doReturn(scheduledFuture).when(executorService).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class)); // --- First Sequence --- processor.onEvent(makeEvent(true, 10000, "id-1"), collector); verify(executorService, times(1)).schedule(taskCaptor.capture(), eq(1000L), eq(TimeUnit.MILLISECONDS)); taskCaptor.getValue().run(); verify(collector, times(1)).collect(any(Event.class)); // --- Reset --- processor.onEvent(makeEvent(false, 11000, "id-2"), collector); verify(scheduledFuture, times(1)).cancel(true); // --- Second Sequence --- processor.onEvent(makeEvent(true, 12000, "id-3"), collector); verify(executorService, times(2)).schedule(taskCaptor.capture(), eq(1000L), eq(TimeUnit.MILLISECONDS)); taskCaptor.getValue().run(); verify(collector, times(2)).collect(any(Event.class)); } // region Helper methods private void configureProcessor(ConditionalTimeFilterProcessor processor, int duration, String timeUnit, String outputMode, String triggerMode, boolean conditionToMeet) { DataProcessorInvocation graph = mock(DataProcessorInvocation.class); IDataProcessorParameters params = mock(IDataProcessorParameters.class); ProcessingElementParameterExtractor extractor = mock(ProcessingElementParameterExtractor.class); when(params.extractor()).thenReturn(extractor); when(extractor.mappingPropertyValue(FILTER_FIELD_ID)).thenReturn("s0::condition"); when(extractor.mappingPropertyValue(TIMESTAMP_FIELD_ID)).thenReturn("s0::timestamp"); when(extractor.singleValueParameter(DURATION_ID, Integer.class)).thenReturn(duration); when(extractor.selectedSingleValue(DURATION_UNIT_ID, String.class)).thenReturn(timeUnit); when(extractor.selectedSingleValue(OUTPUT_MODE_ID, String.class)).thenReturn(outputMode); when(extractor.selectedSingleValue(TRIGGER_MODE_ID, String.class)).thenReturn(triggerMode); when(extractor.slideToggleValue(CONDITION_VALUE_ID)).thenReturn(conditionToMeet); processor.onPipelineStarted(params, collector, null); } private Event makeEvent(boolean condition, long timestamp, String eventId) { Map<String, Object> map = new HashMap<>(); map.put("condition", condition); map.put("timestamp", timestamp); map.put("eventId", eventId); return EventFactory.fromMap(map, new SourceInfo("test-topic", "s0"), new SchemaInfo(new EventSchema(), new ArrayList<>())); } private void assertEventIsEnrichedCorrectly(Event event, long expectedOriginalTs, long expectedProcessingTs, String expectedOriginalEventId, boolean expectedCondition) { assertNotNull(event, "Event should not be null"); assertEquals(expectedOriginalEventId, event.getFieldByRuntimeName("eventId").getAsPrimitive().getAsString()); assertEquals(expectedCondition, event.getFieldByRuntimeName("condition").getAsPrimitive().getAsBoolean()); assertEquals(expectedOriginalTs, event.getFieldByRuntimeName(ORIGINAL_TIMESTAMP_FIELD).getAsPrimitive().getAsLong()); assertEquals(expectedProcessingTs, event.getFieldByRuntimeName(PROCESSING_TIMESTAMP_FIELD).getAsPrimitive().getAsLong()); assertEquals(expectedProcessingTs - expectedOriginalTs, event.getFieldByRuntimeName(TIME_DIFFERENCE_FIELD).getAsPrimitive().getAsLong()); } private void assertEventIsEnrichedCorrectly(Event event, long expectedOriginalTs, String expectedOriginalEventId, boolean expectedCondition) { assertNotNull(event, "Event should not be null"); assertEquals(expectedOriginalEventId, event.getFieldByRuntimeName("eventId").getAsPrimitive().getAsString()); assertEquals(expectedCondition, event.getFieldByRuntimeName("condition").getAsPrimitive().getAsBoolean()); assertEquals(expectedOriginalTs, event.getFieldByRuntimeName(ORIGINAL_TIMESTAMP_FIELD).getAsPrimitive().getAsLong()); assertTrue(event.getFieldByRuntimeName(PROCESSING_TIMESTAMP_FIELD).getAsPrimitive().getAsLong() >= expectedOriginalTs); assertTrue(event.getFieldByRuntimeName(TIME_DIFFERENCE_FIELD).getAsPrimitive().getAsLong() >= 0); } // endregion } ``` PR introduces (a) breaking change(s): no PR introduces (a) deprecation(s): no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@streampipes.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org