sfc-gh-pvillard commented on code in PR #10716:
URL: https://github.com/apache/nifi/pull/10716#discussion_r2676849620


##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java:
##########
@@ -98,10 +104,10 @@ public class ConsumeBoxEvents extends AbstractBoxProcessor 
implements Verifiable
 
     private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
 
-    private volatile BoxAPIConnection boxAPIConnection;
-    private volatile EventStream eventStream;
-    protected volatile BlockingQueue<BoxEvent> events;
-    private volatile AtomicLong position = new AtomicLong(0);
+    private volatile BoxClient boxClient;
+    protected volatile BlockingQueue<Event> events;
+    private volatile AtomicReference<String> position = new 
AtomicReference<>("0");

Review Comment:
   done



##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java:
##########
@@ -135,47 +138,71 @@ public void onScheduled(final ProcessContext context) {
             events = new LinkedBlockingQueue<>(queueCapacity);
         } else {
             // create new one with events from the old queue in case capacity 
has changed
-            final BlockingQueue<BoxEvent> newQueue = new 
LinkedBlockingQueue<>(queueCapacity);
+            final BlockingQueue<Event> newQueue = new 
LinkedBlockingQueue<>(queueCapacity);
             newQueue.addAll(events);
             events = newQueue;
         }
 
-        eventStream.addListener(new EventListener() {
+        // Start polling for events in a background thread
+        pollingExecutor = Executors.newSingleThreadScheduledExecutor();
+        pollingExecutor.scheduleWithFixedDelay(() -> {
+            try {
+                pollEvents(context);

Review Comment:
   done



##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java:
##########
@@ -135,47 +138,71 @@ public void onScheduled(final ProcessContext context) {
             events = new LinkedBlockingQueue<>(queueCapacity);
         } else {
             // create new one with events from the old queue in case capacity 
has changed
-            final BlockingQueue<BoxEvent> newQueue = new 
LinkedBlockingQueue<>(queueCapacity);
+            final BlockingQueue<Event> newQueue = new 
LinkedBlockingQueue<>(queueCapacity);
             newQueue.addAll(events);
             events = newQueue;
         }
 
-        eventStream.addListener(new EventListener() {
+        // Start polling for events in a background thread
+        pollingExecutor = Executors.newSingleThreadScheduledExecutor();
+        pollingExecutor.scheduleWithFixedDelay(() -> {
+            try {
+                pollEvents(context);
+            } catch (Exception e) {
+                getLogger().warn("Error polling Box events", e);
+            }
+        }, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
+    }
 
-            @Override
-            public void onEvent(BoxEvent event) {
-                try {
-                    events.put(event);
-                } catch (InterruptedException e) {
-                    throw new RuntimeException("Interrupted while trying to 
put the event into the queue", e);
+    private void pollEvents(final ProcessContext context) {
+        try {
+            final GetEventsQueryParams queryParams = new 
GetEventsQueryParams.Builder()
+                    .streamPosition(position.get())
+                    .streamType(GetEventsQueryParamsStreamTypeField.ALL)
+                    .build();
+
+            final Events eventResult = 
boxClient.getEvents().getEvents(queryParams);
+
+            if (eventResult.getEntries() != null) {
+                for (Event event : eventResult.getEntries()) {
+                    events.offer(event);
                 }
             }
 
-            @Override
-            public void onNextPosition(long pos) {
+            final String newPosition = 
extractStreamPosition(eventResult.getNextStreamPosition());
+            if (newPosition != null) {
+                position.set(newPosition);
                 try {
-                    context.getStateManager().setState(Map.of(POSITION_KEY, 
String.valueOf(pos)), Scope.CLUSTER);
-                    position.set(pos);
+                    context.getStateManager().setState(Map.of(POSITION_KEY, 
newPosition), Scope.CLUSTER);
                 } catch (IOException e) {
-                    getLogger().warn("Failed to save position {} in processor 
state", pos, e);
+                    getLogger().warn("Failed to save position {} in processor 
state", newPosition, e);
                 }
             }
+        } catch (Exception e) {
+            getLogger().warn("An error occurred while polling Box events. Last 
tracked position {}", position.get(), e);
+        }
+    }
 
-            @Override
-            public boolean onException(Throwable e) {
-                getLogger().warn("An error has been received from the stream. 
Last tracked position {}", position.get(), e);
-                return true;
-            }
-
-        });
-
-        eventStream.start();
+    /**
+     * Extracts the stream position value from EventsNextStreamPositionField.
+     * The field can contain either a String or Long value.
+     */
+    private String extractStreamPosition(final EventsNextStreamPositionField 
positionField) {
+        if (positionField == null) {
+            return null;
+        }
+        if (positionField.isString()) {
+            return positionField.getString();
+        } else if (positionField.isLongNumber()) {
+            return String.valueOf(positionField.getLongNumber());
+        }
+        return null;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to