This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2559fc1e53 NIFI-14467 Stop ConsumeBoxEnterpriseEvents when not
scheduled (#9873)
2559fc1e53 is described below
commit 2559fc1e53242f03f263e5e68003342c8705bb03
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Apr 17 16:53:35 2025 +0200
NIFI-14467 Stop ConsumeBoxEnterpriseEvents when not scheduled (#9873)
Signed-off-by: David Handermann <[email protected]>
---
.../processors/box/ConsumeBoxEnterpriseEvents.java | 9 +-
.../box/ConsumeBoxEnterpriseEventsTest.java | 107 ++++++++++++++++-----
2 files changed, 89 insertions(+), 27 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
index a164c9d6ef..63f37c568e 100644
---
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
@@ -179,13 +179,18 @@ public class ConsumeBoxEnterpriseEvents extends
AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
- while (true) {
+ while (isScheduled()) {
+ getLogger().debug("Consuming Box Events from position: {}",
streamPosition);
+
final EventLog eventLog = getEventLog(streamPosition);
streamPosition = eventLog.getNextStreamPosition();
+
+ getLogger().debug("Consumed {} Box Enterprise Events. New
position: {}", eventLog.getSize(), streamPosition);
+
writeStreamPosition(streamPosition, session);
if (eventLog.getSize() == 0) {
- return;
+ break;
}
writeLogAsRecords(eventLog, session);
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
index 69fc459ad0..d0c07a3a29 100644
---
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
@@ -25,16 +25,26 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunners;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
@@ -42,18 +52,11 @@ import static org.mockito.Mockito.when;
class ConsumeBoxEnterpriseEventsTest extends AbstractBoxFileTest {
- private TestEventStream eventStream;
+ private TestConsumeBoxEnterpriseEvents processor;
@BeforeEach
void setUp() throws Exception {
- eventStream = new TestEventStream();
-
- final ConsumeBoxEnterpriseEvents processor = new
ConsumeBoxEnterpriseEvents() {
- @Override
- EventLog getEventLog(String position) {
- return eventStream.consume(position);
- }
- };
+ processor = new TestConsumeBoxEnterpriseEvents();
testRunner = TestRunners.newTestRunner(processor);
super.setUp();
@@ -71,6 +74,9 @@ class ConsumeBoxEnterpriseEventsTest extends
AbstractBoxFileTest {
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_OFFSET,
startOffset);
}
+ final TestEventStream eventStream = new TestEventStream();
+ processor.overrideGetEventLog(eventStream::consume);
+
eventStream.addEvent(0);
eventStream.addEvent(1);
eventStream.addEvent(2);
@@ -97,6 +103,39 @@ class ConsumeBoxEnterpriseEventsTest extends
AbstractBoxFileTest {
);
}
+ @Test
+ void testGracefulTermination() throws InterruptedException {
+ final CountDownLatch scheduledLatch = new CountDownLatch(1);
+ final AtomicInteger consumedEvents = new AtomicInteger(0);
+
+ // Infinite stream.
+ processor.overrideGetEventLog(__ -> {
+ scheduledLatch.countDown();
+ consumedEvents.incrementAndGet();
+ return createEventLog(List.of(createBoxEvent(1)), "");
+ });
+
+ final ExecutorService runExecutor =
Executors.newSingleThreadExecutor();
+
+ try {
+ // Starting the processor that consumes an infinite stream.
+ final Future<?> runFuture = runExecutor.submit(() ->
testRunner.run(/*iterations=*/ 1, /*stopOnFinish=*/ false));
+
+ assertTrue(scheduledLatch.await(5, TimeUnit.SECONDS), "Processor
did not start");
+
+ // Triggering the processor to stop.
+ testRunner.unSchedule();
+
+ assertDoesNotThrow(() -> runFuture.get(5, TimeUnit.SECONDS),
"Processor did not stop gracefully");
+
+
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
consumedEvents.get());
+ } finally {
+ // We can't use try with resources, as Executors use a shutdown
method
+ // which indefinitely waits for submitted tasks.
+ runExecutor.shutdownNow();
+ }
+ }
+
private Stream<Integer> extractEventIds(final MockFlowFile flowFile) {
final JsonValue json = Json.parse(flowFile.getContent());
return json.asArray().values().stream()
@@ -105,6 +144,23 @@ class ConsumeBoxEnterpriseEventsTest extends
AbstractBoxFileTest {
.map(Integer::parseInt);
}
+ /**
+ * This class is used to override external call in {@link
ConsumeBoxEnterpriseEvents#getEventLog(String)}.
+ */
+ private static class TestConsumeBoxEnterpriseEvents extends
ConsumeBoxEnterpriseEvents {
+
+ private volatile Function<String, EventLog> fakeEventLog;
+
+ void overrideGetEventLog(final Function<String, EventLog>
fakeEventLog) {
+ this.fakeEventLog = fakeEventLog;
+ }
+
+ @Override
+ EventLog getEventLog(String position) {
+ return fakeEventLog.apply(position);
+ }
+ }
+
private static class TestEventStream {
private static final String NOW_POSITION = "now";
@@ -112,39 +168,40 @@ class ConsumeBoxEnterpriseEventsTest extends
AbstractBoxFileTest {
private final List<BoxEvent> events = new ArrayList<>();
void addEvent(final int eventId) {
- final BoxEvent boxEvent = new BoxEvent(null, "{\"event_id\":
\"%d\"}".formatted(eventId));
- events.add(boxEvent);
+ events.add(createBoxEvent(eventId));
}
EventLog consume(final String position) {
+ final String nextPosition = String.valueOf(events.size());
+
if (NOW_POSITION.equals(position)) {
- return createEmptyEventLog();
+ return createEventLog(emptyList(), nextPosition);
}
final int streamPosition = Integer.parseInt(position);
if (streamPosition > events.size()) {
// Real Box API returns the latest offset position, even if
streamPosition was greater.
- return createEmptyEventLog();
+ return createEventLog(emptyList(), nextPosition);
}
final List<BoxEvent> consumedEvents =
events.subList(streamPosition, events.size());
- return createEventLog(consumedEvents);
+ return createEventLog(consumedEvents, nextPosition);
}
+ }
- private EventLog createEmptyEventLog() {
- return createEventLog(emptyList());
- }
+ private static BoxEvent createBoxEvent(final int eventId) {
+ return new BoxEvent(null, "{\"event_id\": \"%d\"}".formatted(eventId));
+ }
- private EventLog createEventLog(final List<BoxEvent> consumedEvents) {
- // EventLog is not designed for being extended. Thus, mocking it.
- final EventLog eventLog = mock();
+ private static EventLog createEventLog(final List<BoxEvent>
consumedEvents, final String nextPosition) {
+ // EventLog is not designed for being extended. Thus, mocking it.
+ final EventLog eventLog = mock();
-
when(eventLog.getNextStreamPosition()).thenReturn(String.valueOf(events.size()));
-
lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
-
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
+ when(eventLog.getNextStreamPosition()).thenReturn(nextPosition);
+ lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
+
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
- return eventLog;
- }
+ return eventLog;
}
}