This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2559fc1e53 NIFI-14467 Stop ConsumeBoxEnterpriseEvents when not 
scheduled (#9873)
2559fc1e53 is described below

commit 2559fc1e53242f03f263e5e68003342c8705bb03
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Apr 17 16:53:35 2025 +0200

    NIFI-14467 Stop ConsumeBoxEnterpriseEvents when not scheduled (#9873)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/box/ConsumeBoxEnterpriseEvents.java |   9 +-
 .../box/ConsumeBoxEnterpriseEventsTest.java        | 107 ++++++++++++++++-----
 2 files changed, 89 insertions(+), 27 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
index a164c9d6ef..63f37c568e 100644
--- 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
@@ -179,13 +179,18 @@ public class ConsumeBoxEnterpriseEvents extends 
AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        while (true) {
+        while (isScheduled()) {
+            getLogger().debug("Consuming Box Events from position: {}", 
streamPosition);
+
             final EventLog eventLog = getEventLog(streamPosition);
             streamPosition = eventLog.getNextStreamPosition();
+
+            getLogger().debug("Consumed {} Box Enterprise Events. New 
position: {}", eventLog.getSize(), streamPosition);
+
             writeStreamPosition(streamPosition, session);
 
             if (eventLog.getSize() == 0) {
-                return;
+                break;
             }
 
             writeLogAsRecords(eventLog, session);
diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
index 69fc459ad0..d0c07a3a29 100644
--- 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
@@ -25,16 +25,26 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunners;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
 import java.util.stream.Stream;
 
 import static java.util.Collections.emptyList;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
@@ -42,18 +52,11 @@ import static org.mockito.Mockito.when;
 
 class ConsumeBoxEnterpriseEventsTest extends AbstractBoxFileTest {
 
-    private TestEventStream eventStream;
+    private TestConsumeBoxEnterpriseEvents processor;
 
     @BeforeEach
     void setUp() throws Exception {
-        eventStream = new TestEventStream();
-
-        final ConsumeBoxEnterpriseEvents processor = new 
ConsumeBoxEnterpriseEvents() {
-            @Override
-            EventLog getEventLog(String position) {
-                return eventStream.consume(position);
-            }
-        };
+        processor = new TestConsumeBoxEnterpriseEvents();
 
         testRunner = TestRunners.newTestRunner(processor);
         super.setUp();
@@ -71,6 +74,9 @@ class ConsumeBoxEnterpriseEventsTest extends 
AbstractBoxFileTest {
             testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_OFFSET, 
startOffset);
         }
 
+        final TestEventStream eventStream = new TestEventStream();
+        processor.overrideGetEventLog(eventStream::consume);
+
         eventStream.addEvent(0);
         eventStream.addEvent(1);
         eventStream.addEvent(2);
@@ -97,6 +103,39 @@ class ConsumeBoxEnterpriseEventsTest extends 
AbstractBoxFileTest {
         );
     }
 
+    @Test
+    void testGracefulTermination() throws InterruptedException {
+        final CountDownLatch scheduledLatch = new CountDownLatch(1);
+        final AtomicInteger consumedEvents = new AtomicInteger(0);
+
+        // Infinite stream.
+        processor.overrideGetEventLog(__ -> {
+            scheduledLatch.countDown();
+            consumedEvents.incrementAndGet();
+            return createEventLog(List.of(createBoxEvent(1)), "");
+        });
+
+        final ExecutorService runExecutor = 
Executors.newSingleThreadExecutor();
+
+        try {
+            // Starting the processor that consumes an infinite stream.
+            final Future<?> runFuture = runExecutor.submit(() -> 
testRunner.run(/*iterations=*/ 1, /*stopOnFinish=*/ false));
+
+            assertTrue(scheduledLatch.await(5, TimeUnit.SECONDS), "Processor 
did not start");
+
+            // Triggering the processor to stop.
+            testRunner.unSchedule();
+
+            assertDoesNotThrow(() -> runFuture.get(5, TimeUnit.SECONDS), 
"Processor did not stop gracefully");
+
+            
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
 consumedEvents.get());
+        } finally {
+            // We can't use try with resources, as Executors use a shutdown 
method
+            // which indefinitely waits for submitted tasks.
+            runExecutor.shutdownNow();
+        }
+    }
+
     private Stream<Integer> extractEventIds(final MockFlowFile flowFile) {
         final JsonValue json = Json.parse(flowFile.getContent());
         return json.asArray().values().stream()
@@ -105,6 +144,23 @@ class ConsumeBoxEnterpriseEventsTest extends 
AbstractBoxFileTest {
                 .map(Integer::parseInt);
     }
 
+    /**
+     * This class is used to override external call in {@link 
ConsumeBoxEnterpriseEvents#getEventLog(String)}.
+     */
+    private static class TestConsumeBoxEnterpriseEvents extends 
ConsumeBoxEnterpriseEvents {
+
+        private volatile Function<String, EventLog> fakeEventLog;
+
+        void overrideGetEventLog(final Function<String, EventLog> 
fakeEventLog) {
+            this.fakeEventLog = fakeEventLog;
+        }
+
+        @Override
+        EventLog getEventLog(String position) {
+            return fakeEventLog.apply(position);
+        }
+    }
+
     private static class TestEventStream {
 
         private static final String NOW_POSITION = "now";
@@ -112,39 +168,40 @@ class ConsumeBoxEnterpriseEventsTest extends 
AbstractBoxFileTest {
         private final List<BoxEvent> events = new ArrayList<>();
 
         void addEvent(final int eventId) {
-            final BoxEvent boxEvent = new BoxEvent(null, "{\"event_id\": 
\"%d\"}".formatted(eventId));
-            events.add(boxEvent);
+            events.add(createBoxEvent(eventId));
         }
 
         EventLog consume(final String position) {
+            final String nextPosition = String.valueOf(events.size());
+
             if (NOW_POSITION.equals(position)) {
-                return createEmptyEventLog();
+                return createEventLog(emptyList(), nextPosition);
             }
 
             final int streamPosition = Integer.parseInt(position);
             if (streamPosition > events.size()) {
                 // Real Box API returns the latest offset position, even if 
streamPosition was greater.
-                return createEmptyEventLog();
+                return createEventLog(emptyList(), nextPosition);
             }
 
             final List<BoxEvent> consumedEvents = 
events.subList(streamPosition, events.size());
 
-            return createEventLog(consumedEvents);
+            return createEventLog(consumedEvents, nextPosition);
         }
+    }
 
-        private EventLog createEmptyEventLog() {
-            return createEventLog(emptyList());
-        }
+    private static BoxEvent createBoxEvent(final int eventId) {
+        return new BoxEvent(null, "{\"event_id\": \"%d\"}".formatted(eventId));
+    }
 
-        private EventLog createEventLog(final List<BoxEvent> consumedEvents) {
-            // EventLog is not designed for being extended. Thus, mocking it.
-            final EventLog eventLog = mock();
+    private static EventLog createEventLog(final List<BoxEvent> 
consumedEvents, final String nextPosition) {
+        // EventLog is not designed for being extended. Thus, mocking it.
+        final EventLog eventLog = mock();
 
-            
when(eventLog.getNextStreamPosition()).thenReturn(String.valueOf(events.size()));
-            
lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
-            
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
+        when(eventLog.getNextStreamPosition()).thenReturn(nextPosition);
+        lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
+        
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
 
-            return eventLog;
-        }
+        return eventLog;
     }
 }

Reply via email to