heisenbergs-uncertainty opened a new pull request, #3669:
URL: https://github.com/apache/streampipes/pull/3669

   <!--
     ~ Licensed to the Apache Software Foundation (ASF) under one or more
     ~ contributor license agreements.  See the NOTICE file distributed with
     ~ this work for additional information regarding copyright ownership.
     ~ The ASF licenses this file to You under the Apache License, Version 2.0
     ~ (the "License"); you may not use this file except in compliance with
     ~ the License.  You may obtain a copy of the License at
     ~
     ~    http://www.apache.org/licenses/LICENSE-2.0
     ~
     ~ Unless required by applicable law or agreed to in writing, software
     ~ distributed under the License is distributed on an "AS IS" BASIS,
     ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     ~ See the License for the specific language governing permissions and
     ~ limitations under the License.
     ~
     -->
   
     <!--
   Thanks for contributing! Here are some tips you can follow to help us 
incorporate your contribution quickly and easily:
   1. If this is your first time, please read our contributor guidelines:
       - https://streampipes.apache.org/community/get-involved/
       - https://cwiki.apache.org/confluence/display/STREAMPIPES/Getting+Started
   2. Make sure the PR title is formatted like: `[#<GitHub issue id>] PR title 
...`
   3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
`[WIP][#<GitHub issue id>] PR title ...`.
   4. Please write your PR title to summarize what this PR proposes/fixes.
   5. Link the PR to the corresponding GitHub issue (if present) in the 
`Development` section in the right menu bar.
   6. Be sure to keep the PR description updated to reflect all changes.
   7. If possible, provide a concise example to reproduce the issue for a 
faster review.
   8. Make sure tests pass via `mvn clean install`.
   9. (Optional) If the contribution is large, please file an Apache ICLA
       - http://apache.org/licenses/icla.pdf
   -->
   
   ### Purpose
   <!--
   Please clarify what changes you are proposing and describe how those changes 
will address the issue.
   Furthermore, describe potential consequences the changes might have.
   -->
   
   The purpose of this processor is to handle use-cases where you need to 
execute some kind of logic when a condition is met for some amount of time.
   
   My specific use case for this operator stemmed for my teams OEE automation 
efforts. We are tracking downtime on CNC machines but want to get reason codes 
from our operators that tell us "why" the machine is down. The problem in CNC 
downtime is that the machine is often instructed to intentionally drop into a 
"not running" state for short bursts. In order to not spam our operators when 
machines drop out of running state for short periods we wanted to only send 
notifications after x amount of time which would indicate that the machine is 
down for some unknown reason. This is what led to this processor.
   
   The processor is decently configurable and should address many use cases 
outside of our own. 
   
   I have also noticed that Streampipes is currently generally lacking in 
processors that operate on time as a condition which I think is a generally 
useful use-case. With some more effort and refactoring, the logic implemented 
in the processor could possibly serve as a based for processors that depend on 
the timestamp to determine their state.
   
   ### Logic of Processor
   
   
![ConditionTimeProcessor](https://github.com/user-attachments/assets/27d972c7-976e-473f-905a-62f41ca3be88)
   
   ### Examples
   
   | Configuration | Results |
   :-------------------------:|:-------------------------:
   
![image](https://github.com/user-attachments/assets/48403fd1-2d0f-42e1-9509-aa4358be87c0)|![image](https://github.com/user-attachments/assets/2fb7ab6b-cf4b-40b3-8f53-c64fd7c6292f)|
   
![image](https://github.com/user-attachments/assets/53b159fc-3ef1-4ff3-9148-f09ec7589612)|![image](https://github.com/user-attachments/assets/23962b36-b49c-40ac-b7dd-c08435853fea)|
   
![image](https://github.com/user-attachments/assets/b3475d1f-50b4-4c9d-8a68-c47381e33453)|![image](https://github.com/user-attachments/assets/807a0677-7087-42c7-a84f-ba796545e709)
   
   ### Remarks
   <!--
   Is there anything left we need to pay attention on?
   Are there some references that might be important? E.g. links to Confluence, 
or discussions
   on the mailing list or GitHub.
   -->
   
   I would be interested in some feedback on my use of the 
ScheduledExecutorService and whether this has the potential to introduce any 
strange behaviour in the pipeline do to it synthetically generating events. 
   
   I also am again unsure how to unit test this in the repository. I was able 
to introduce unit tests in my own custom processor repo but that is running 
against jdk v23 which allows mocking the ScheduledExecutorService which 17 does 
not. 
   
   It also introduces a new dependency from mockito-junit-jupiter for the 
MockitoExtension.class which is another reason why I didn't want to include the 
unit testing into this original PR until further review.
   
   Below are the tests that I are used in my custom component.
   
   ```java
   package com.spmoilandgas.streampipes.conditionduration;
   
   import 
org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
   import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
   import org.apache.streampipes.model.graph.DataProcessorInvocation;
   import org.apache.streampipes.model.runtime.Event;
   import org.apache.streampipes.model.runtime.EventFactory;
   import org.apache.streampipes.model.runtime.SchemaInfo;
   import org.apache.streampipes.model.runtime.SourceInfo;
   import org.apache.streampipes.model.schema.EventSchema;
   import 
org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
   import org.junit.jupiter.api.Test;
   import org.junit.jupiter.api.extension.ExtendWith;
   import org.mockito.ArgumentCaptor;
   import org.mockito.Mock;
   import org.mockito.junit.jupiter.MockitoExtension;
   
   
   import java.util.ArrayList;
   import java.util.HashMap;
   import java.util.Map;
   import java.util.concurrent.ScheduledExecutorService;
   import java.util.concurrent.ScheduledFuture;
   import java.util.concurrent.TimeUnit;
   
   import static org.junit.jupiter.api.Assertions.*;
   import static org.mockito.ArgumentMatchers.any;
   import static org.mockito.ArgumentMatchers.anyLong;
   import static org.mockito.Mockito.*;
   
   @ExtendWith(MockitoExtension.class)
   public class ConditionalTimeFilterProcessorTest {
   
     // region Constants
     private static final String FILTER_FIELD_ID = "filterField";
     private static final String TIMESTAMP_FIELD_ID = "timestampField";
     private static final String DURATION_ID = "duration";
     private static final String DURATION_UNIT_ID = "durationUnit";
     private static final String CONDITION_VALUE_ID = "conditionValue";
     private static final String OUTPUT_MODE_ID = "outputMode";
     private static final String TRIGGER_MODE_ID = "triggerMode";
   
     private static final String TRIGGER_MODE_ON_EVENT = "On Event Arrival";
     private static final String TRIGGER_MODE_ON_TIMER = "On Timer";
   
     private static final String ORIGINAL_TIMESTAMP_FIELD = "originalTimestamp";
     private static final String PROCESSING_TIMESTAMP_FIELD = 
"processingTimestamp";
     private static final String TIME_DIFFERENCE_FIELD = "timeDifference";
     // endregion
   
     @Mock
     private SpOutputCollector collector;
   
     @Mock
     private ScheduledExecutorService executorService;
   
     @Mock
     private ScheduledFuture<?> scheduledFuture;
   
   
     // ###################################
     // ## Tests for "On Event Arrival" ##
     // ###################################
   
     @Test
     public void onEventArrival_fireOnceOnTrue() {
       var processor = new ConditionalTimeFilterProcessor();
       var eventCaptor = ArgumentCaptor.forClass(Event.class);
       configureProcessor(processor, 2, "Seconds", "Fire Once", 
TRIGGER_MODE_ON_EVENT, true);
   
       processor.onEvent(makeEvent(true, 10000, "id-1"), collector);
       processor.onEvent(makeEvent(true, 11000, "id-2"), collector);
       processor.onEvent(makeEvent(true, 12000, "id-3"), collector);
       processor.onEvent(makeEvent(true, 13000, "id-4"), collector);
   
       verify(collector, times(1)).collect(eventCaptor.capture());
       assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 10000L, 12000L, 
"id-3", true);
     }
   
     @Test
     public void onEventArrival_fireOnceOnFalse() {
       var processor = new ConditionalTimeFilterProcessor();
       var eventCaptor = ArgumentCaptor.forClass(Event.class);
       configureProcessor(processor, 2, "Seconds", "Fire Once", 
TRIGGER_MODE_ON_EVENT, false);
   
       processor.onEvent(makeEvent(false, 10000, "id-1"), collector);
       processor.onEvent(makeEvent(true, 11000, "id-2"), collector);
       processor.onEvent(makeEvent(false, 12000, "id-3"), collector);
       processor.onEvent(makeEvent(false, 14000, "id-4"), collector);
   
       verify(collector, times(1)).collect(eventCaptor.capture());
       assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 12000L, 14000L, 
"id-4", false);
     }
   
     @Test
     public void onEventArrival_fireOnceThenResetAndFireAgain() {
       var processor = new ConditionalTimeFilterProcessor();
       var eventCaptor = ArgumentCaptor.forClass(Event.class);
       configureProcessor(processor, 1, "Seconds", "Fire Once", 
TRIGGER_MODE_ON_EVENT, true);
   
       // First sequence
       processor.onEvent(makeEvent(true, 10000, "id-1"), collector);
       processor.onEvent(makeEvent(true, 11000, "id-2"), collector); // Fires
       verify(collector, times(1)).collect(any(Event.class));
   
       // This event should be ignored
       processor.onEvent(makeEvent(true, 12000, "id-3"), collector);
       verify(collector, times(1)).collect(any(Event.class));
   
       // Reset sequence
       processor.onEvent(makeEvent(false, 13000, "id-4"), collector);
   
       // Second sequence
       processor.onEvent(makeEvent(true, 14000, "id-5"), collector); // New 
timer starts
       processor.onEvent(makeEvent(true, 15000, "id-6"), collector); // Fires 
again
   
       verify(collector, times(2)).collect(eventCaptor.capture());
       assertEventIsEnrichedCorrectly(eventCaptor.getAllValues().get(1), 
14000L, 15000L, "id-6", true);
     }
   
     @Test
     public void onEventArrival_fireContinuouslyAfterDelay() {
       var processor = new ConditionalTimeFilterProcessor();
       configureProcessor(processor, 10, "Seconds", "Fire Continuously", 
TRIGGER_MODE_ON_EVENT, true);
   
       // Timer starts, but no event is fired
       processor.onEvent(makeEvent(true, 0, "id-1"), collector);
       verify(collector, never()).collect(any());
   
       // Still not enough time
       processor.onEvent(makeEvent(true, 9000, "id-2"), collector);
       verify(collector, never()).collect(any());
   
       // Duration met, first event is fired
       processor.onEvent(makeEvent(true, 10000, "id-3"), collector);
       verify(collector, times(1)).collect(any(Event.class));
   
       // Subsequent true events should be fired immediately
       processor.onEvent(makeEvent(true, 11000, "id-4"), collector);
       verify(collector, times(2)).collect(any(Event.class));
   
       processor.onEvent(makeEvent(true, 12000, "id-5"), collector);
       verify(collector, times(3)).collect(any(Event.class));
     }
   
   
     // ###########################
     // ## Tests for "On Timer"  ##
     // ###########################
   
     @Test
     public void onTimer_fireWithoutSubsequentEvents() {
       var processor = new ConditionalTimeFilterProcessor();
       var taskCaptor = ArgumentCaptor.forClass(Runnable.class);
       var eventCaptor = ArgumentCaptor.forClass(Event.class);
       configureProcessor(processor, 1, "Seconds", "Fire Once", 
TRIGGER_MODE_ON_TIMER, true);
       processor.executorService = this.executorService;
   
       processor.onEvent(makeEvent(true, 10000, "id-1"), collector);
       verify(executorService).schedule(taskCaptor.capture(), eq(1000L), 
eq(TimeUnit.MILLISECONDS));
   
       taskCaptor.getValue().run();
   
       verify(collector, times(1)).collect(eventCaptor.capture());
       assertEventIsEnrichedCorrectly(eventCaptor.getValue(), 10000L, "id-1", 
true);
     }
   
     @Test
     public void onTimer_fireOnceThenResetAndFireAgain() {
       var processor = new ConditionalTimeFilterProcessor();
       var taskCaptor = ArgumentCaptor.forClass(Runnable.class);
       configureProcessor(processor, 1, "Seconds", "Fire Once", 
TRIGGER_MODE_ON_TIMER, true);
       processor.executorService = this.executorService;
   
       
doReturn(scheduledFuture).when(executorService).schedule(any(Runnable.class), 
anyLong(), any(TimeUnit.class));
   
       // --- First Sequence ---
       processor.onEvent(makeEvent(true, 10000, "id-1"), collector);
       verify(executorService, times(1)).schedule(taskCaptor.capture(), 
eq(1000L), eq(TimeUnit.MILLISECONDS));
       taskCaptor.getValue().run();
       verify(collector, times(1)).collect(any(Event.class));
   
       // --- Reset ---
       processor.onEvent(makeEvent(false, 11000, "id-2"), collector);
       verify(scheduledFuture, times(1)).cancel(true);
   
       // --- Second Sequence ---
       processor.onEvent(makeEvent(true, 12000, "id-3"), collector);
       verify(executorService, times(2)).schedule(taskCaptor.capture(), 
eq(1000L), eq(TimeUnit.MILLISECONDS));
       taskCaptor.getValue().run();
       verify(collector, times(2)).collect(any(Event.class));
     }
   
   
     // region Helper methods
     private void configureProcessor(ConditionalTimeFilterProcessor processor,
                                     int duration,
                                     String timeUnit,
                                     String outputMode,
                                     String triggerMode,
                                     boolean conditionToMeet) {
   
       DataProcessorInvocation graph = mock(DataProcessorInvocation.class);
       IDataProcessorParameters params = mock(IDataProcessorParameters.class);
       ProcessingElementParameterExtractor extractor = 
mock(ProcessingElementParameterExtractor.class);
   
       when(params.extractor()).thenReturn(extractor);
       
when(extractor.mappingPropertyValue(FILTER_FIELD_ID)).thenReturn("s0::condition");
       
when(extractor.mappingPropertyValue(TIMESTAMP_FIELD_ID)).thenReturn("s0::timestamp");
       when(extractor.singleValueParameter(DURATION_ID, 
Integer.class)).thenReturn(duration);
       when(extractor.selectedSingleValue(DURATION_UNIT_ID, 
String.class)).thenReturn(timeUnit);
       when(extractor.selectedSingleValue(OUTPUT_MODE_ID, 
String.class)).thenReturn(outputMode);
       when(extractor.selectedSingleValue(TRIGGER_MODE_ID, 
String.class)).thenReturn(triggerMode);
       
when(extractor.slideToggleValue(CONDITION_VALUE_ID)).thenReturn(conditionToMeet);
   
       processor.onPipelineStarted(params, collector, null);
     }
   
     private Event makeEvent(boolean condition, long timestamp, String eventId) 
{
       Map<String, Object> map = new HashMap<>();
       map.put("condition", condition);
       map.put("timestamp", timestamp);
       map.put("eventId", eventId);
       return EventFactory.fromMap(map,
           new SourceInfo("test-topic", "s0"),
           new SchemaInfo(new EventSchema(), new ArrayList<>()));
     }
   
     private void assertEventIsEnrichedCorrectly(Event event, long 
expectedOriginalTs, long expectedProcessingTs, String expectedOriginalEventId, 
boolean expectedCondition) {
       assertNotNull(event, "Event should not be null");
   
       assertEquals(expectedOriginalEventId, 
event.getFieldByRuntimeName("eventId").getAsPrimitive().getAsString());
       assertEquals(expectedCondition, 
event.getFieldByRuntimeName("condition").getAsPrimitive().getAsBoolean());
   
       assertEquals(expectedOriginalTs, 
event.getFieldByRuntimeName(ORIGINAL_TIMESTAMP_FIELD).getAsPrimitive().getAsLong());
       assertEquals(expectedProcessingTs, 
event.getFieldByRuntimeName(PROCESSING_TIMESTAMP_FIELD).getAsPrimitive().getAsLong());
       assertEquals(expectedProcessingTs - expectedOriginalTs, 
event.getFieldByRuntimeName(TIME_DIFFERENCE_FIELD).getAsPrimitive().getAsLong());
     }
   
     private void assertEventIsEnrichedCorrectly(Event event, long 
expectedOriginalTs, String expectedOriginalEventId, boolean expectedCondition) {
       assertNotNull(event, "Event should not be null");
   
       assertEquals(expectedOriginalEventId, 
event.getFieldByRuntimeName("eventId").getAsPrimitive().getAsString());
       assertEquals(expectedCondition, 
event.getFieldByRuntimeName("condition").getAsPrimitive().getAsBoolean());
   
       assertEquals(expectedOriginalTs, 
event.getFieldByRuntimeName(ORIGINAL_TIMESTAMP_FIELD).getAsPrimitive().getAsLong());
       
assertTrue(event.getFieldByRuntimeName(PROCESSING_TIMESTAMP_FIELD).getAsPrimitive().getAsLong()
 >= expectedOriginalTs);
       
assertTrue(event.getFieldByRuntimeName(TIME_DIFFERENCE_FIELD).getAsPrimitive().getAsLong()
 >= 0);
     }
     // endregion
   }
   
   ``` 
   
   PR introduces (a) breaking change(s): no
   
   PR introduces (a) deprecation(s): no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@streampipes.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to