sfc-gh-pvillard commented on code in PR #10716:
URL: https://github.com/apache/nifi/pull/10716#discussion_r2676848999


##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-services/src/main/java/org/apache/nifi/box/controllerservices/JsonConfigBasedBoxClientService.java:
##########
@@ -257,5 +233,8 @@ public void migrateProperties(PropertyConfiguration config) 
{
         config.renameProperty("app-config-file", APP_CONFIG_FILE.getName());
         config.renameProperty("app-config-json", APP_CONFIG_JSON.getName());
         ProxyServiceMigration.renameProxyConfigurationServiceProperty(config);
+        // Remove timeout properties that are no longer supported in Box SDK 
10.x
+        config.removeProperty("Connect Timeout");
+        config.removeProperty("Read Timeout");

Review Comment:
   done



##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java:
##########
@@ -174,55 +176,80 @@ private String initializeStartEventPosition(final 
ProcessContext context) {
     }
 
     private String retrieveLatestStreamPosition() {
-        final EventLog eventLog = getEventLog(LATEST_POSITION);
-        return eventLog.getNextStreamPosition();
+        final Events events = getEvents(LATEST_POSITION);
+        return extractStreamPosition(events.getNextStreamPosition());
+    }
+
+    /**
+     * Extracts the stream position value from EventsNextStreamPositionField.
+     * The field can contain either a String or Long value.
+     */
+    private String extractStreamPosition(final EventsNextStreamPositionField 
positionField) {
+        if (positionField == null) {
+            return null;
+        }
+        if (positionField.isString()) {
+            return positionField.getString();
+        } else if (positionField.isLongNumber()) {
+            return String.valueOf(positionField.getLongNumber());
+        }
+        return null;

Review Comment:
   done



##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java:
##########
@@ -174,55 +176,80 @@ private String initializeStartEventPosition(final 
ProcessContext context) {
     }
 
     private String retrieveLatestStreamPosition() {
-        final EventLog eventLog = getEventLog(LATEST_POSITION);
-        return eventLog.getNextStreamPosition();
+        final Events events = getEvents(LATEST_POSITION);
+        return extractStreamPosition(events.getNextStreamPosition());
+    }
+
+    /**
+     * Extracts the stream position value from EventsNextStreamPositionField.
+     * The field can contain either a String or Long value.
+     */
+    private String extractStreamPosition(final EventsNextStreamPositionField 
positionField) {
+        if (positionField == null) {
+            return null;
+        }
+        if (positionField.isString()) {
+            return positionField.getString();
+        } else if (positionField.isLongNumber()) {
+            return String.valueOf(positionField.getLongNumber());
+        }
+        return null;
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         while (isScheduled()) {
             getLogger().debug("Consuming Box Events from position: {}", 
streamPosition);
 
-            final EventLog eventLog = getEventLog(streamPosition);
-            streamPosition = eventLog.getNextStreamPosition();
+            final Events events = getEvents(streamPosition);
+            final String newPosition = 
extractStreamPosition(events.getNextStreamPosition());
+            streamPosition = newPosition != null ? newPosition : 
streamPosition;
 
-            getLogger().debug("Consumed {} Box Enterprise Events. New 
position: {}", eventLog.getSize(), streamPosition);
+            final int eventCount = events.getEntries() != null ? 
events.getEntries().size() : 0;
+            getLogger().debug("Consumed {} Box Enterprise Events. New 
position: {}", eventCount, streamPosition);
 
             writeStreamPosition(streamPosition, session);
 
-            if (eventLog.getSize() == 0) {
+            if (eventCount == 0) {
                 break;
             }
 
-            writeLogAsRecords(eventLog, session);
+            writeEventsAsRecords(events, session);
         }
     }
 
     // Package-private for testing.
-    EventLog getEventLog(final String position) {
-        final EnterpriseEventsStreamRequest request = new 
EnterpriseEventsStreamRequest()
+    Events getEvents(final String position) {
+        final GetEventsQueryParams.Builder queryParamsBuilder = new 
GetEventsQueryParams.Builder()
                 .limit(LIMIT)
-                .position(position)
-                .typeNames(eventTypes);
+                .streamPosition(position)
+                
.streamType(GetEventsQueryParamsStreamTypeField.ADMIN_LOGS_STREAMING);
+
+        // Note: Event type filtering has been removed in SDK v10 migration
+        // The eventType filter now requires 
GetEventsQueryParamsEventTypeField enum values
+        // TODO: Implement event type filtering with proper enum conversion if 
needed

Review Comment:
   done



##########
nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java:
##########
@@ -16,196 +16,115 @@
  */
 package org.apache.nifi.processors.box;
 
-import com.box.sdk.BoxEvent;
-import com.box.sdk.EventLog;
-import com.eclipsesource.json.Json;
-import com.eclipsesource.json.JsonValue;
+import com.box.sdkgen.schemas.event.Event;
+import com.box.sdkgen.schemas.event.EventEventTypeField;
+import com.box.sdkgen.schemas.events.Events;
+import com.box.sdkgen.schemas.events.EventsNextStreamPositionField;
+import com.box.sdkgen.serialization.json.EnumWrapper;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents.StartEventPosition;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunners;
-import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Stream;
 
-import static java.util.Collections.emptyList;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.params.provider.Arguments.arguments;
-import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 class ConsumeBoxEnterpriseEventsTest extends AbstractBoxFileTest {
 
-    private TestConsumeBoxEnterpriseEvents processor;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+    private Events mockEvents;
+    private Events emptyEvents;
 
     @Override
     @BeforeEach
     void setUp() throws Exception {
-        processor = new TestConsumeBoxEnterpriseEvents();
+        // Create the empty events mock (returned after first call to break 
the loop)
+        emptyEvents = mock(Events.class);
+        when(emptyEvents.getEntries()).thenReturn(List.of());
+        EventsNextStreamPositionField nextStreamPositionEmpty = 
mock(EventsNextStreamPositionField.class);
+        when(nextStreamPositionEmpty.isString()).thenReturn(true);
+        when(nextStreamPositionEmpty.getString()).thenReturn("end");
+        
when(emptyEvents.getNextStreamPosition()).thenReturn(nextStreamPositionEmpty);
+
+        // Create a test subclass that overrides getEvents to use our mock data
+        final AtomicInteger callCount = new AtomicInteger(0);
+        final ConsumeBoxEnterpriseEventsTest testReference = this;
+
+        final ConsumeBoxEnterpriseEvents testSubject = new 
ConsumeBoxEnterpriseEvents() {
+            @Override
+            Events getEvents(String position) {
+                // Return events on first call, empty on subsequent calls to 
break the loop
+                if (callCount.getAndIncrement() == 0) {
+                    return testReference.mockEvents != null ? 
testReference.mockEvents : emptyEvents;
+                }
+                return emptyEvents;
+            }
+        };
 
-        testRunner = TestRunners.newTestRunner(processor);
+        testRunner = TestRunners.newTestRunner(testSubject);
         super.setUp();
     }
 
-    @ParameterizedTest
-    @MethodSource("dataFor_testConsumeEvents")
-    void testConsumeEvents(
-            final StartEventPosition startEventPosition,
-            final @Nullable String startOffset,
-            final int expectedFlowFiles,
-            final List<Integer> expectedEventIds) {
-        
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_EVENT_POSITION, 
startEventPosition);
-        if (startOffset != null) {
-            testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_OFFSET, 
startOffset);
-        }
-
-        final TestEventStream eventStream = new TestEventStream();
-        processor.overrideGetEventLog(eventStream::consume);
-
-        eventStream.addEvent(0);
-        eventStream.addEvent(1);
-        eventStream.addEvent(2);
-        testRunner.run();
+    @Test
+    void testConsumeEventsFromEarliest() throws Exception {
+        
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_EVENT_POSITION, 
StartEventPosition.EARLIEST);
+
+        // Create mock events
+        List<Event> events = new ArrayList<>();
+        events.add(createMockEvent("1", EventEventTypeField.ITEM_CREATE));
+        events.add(createMockEvent("2", EventEventTypeField.ITEM_TRASH));
+        events.add(createMockEvent("3", EventEventTypeField.ITEM_UPLOAD));
+
+        mockEvents = mock(Events.class);
+        when(mockEvents.getEntries()).thenReturn(events);
+        EventsNextStreamPositionField nextStreamPosition3 = 
mock(EventsNextStreamPositionField.class);
+        when(nextStreamPosition3.isString()).thenReturn(true);
+        when(nextStreamPosition3.getString()).thenReturn("3");
+        
when(mockEvents.getNextStreamPosition()).thenReturn(nextStreamPosition3);
 
-        eventStream.addEvent(3);
         testRunner.run();
 
-        
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
 expectedFlowFiles);
-
-        final List<Integer> eventIds = 
testRunner.getFlowFilesForRelationship(ConsumeBoxEnterpriseEvents.REL_SUCCESS).stream()
-                .flatMap(this::extractEventIds)
-                .toList();
+        
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
 1);
+        final MockFlowFile flowFile = 
testRunner.getFlowFilesForRelationship(ConsumeBoxEnterpriseEvents.REL_SUCCESS).getFirst();
 
-        assertEquals(expectedEventIds, eventIds);
+        // Parse and verify the content
+        final String content = flowFile.getContent();
+        final JsonNode jsonArray = OBJECT_MAPPER.readTree(content);
+        assertEquals(3, jsonArray.size());
 
-        assertEquals(eventIds.size(), 
testRunner.getCounterValue(ConsumeBoxEnterpriseEvents.COUNTER_RECORDS_PROCESSED));
-    }
-
-    static List<Arguments> dataFor_testConsumeEvents() {
-        return List.of(
-                arguments(StartEventPosition.EARLIEST, null, 2, List.of(0, 1, 
2, 3)),
-                arguments(StartEventPosition.OFFSET, "1", 2, List.of(1, 2, 3)),
-                arguments(StartEventPosition.OFFSET, "12345", 1, List.of(3)),
-                arguments(StartEventPosition.LATEST, null, 1, List.of(3))
-        );
+        assertEquals(3, 
testRunner.getCounterValue(ConsumeBoxEnterpriseEvents.COUNTER_RECORDS_PROCESSED));
     }
 
     @Test
-    void testGracefulTermination() throws InterruptedException {
-        final CountDownLatch scheduledLatch = new CountDownLatch(1);
-        final AtomicInteger consumedEvents = new AtomicInteger(0);
-
-        // Infinite stream.
-        processor.overrideGetEventLog(__ -> {
-            scheduledLatch.countDown();
-            consumedEvents.incrementAndGet();
-            return createEventLog(List.of(createBoxEvent(1)), "");
-        });
-
-        final ExecutorService runExecutor = 
Executors.newSingleThreadExecutor();
-
-        try {
-            // Starting the processor that consumes an infinite stream.
-            final Future<?> runFuture = runExecutor.submit(() -> 
testRunner.run(/*iterations=*/ 1, /*stopOnFinish=*/ false));
-
-            assertTrue(scheduledLatch.await(5, TimeUnit.SECONDS), "Processor 
did not start");
-
-            // Triggering the processor to stop.
-            testRunner.unSchedule();
-
-            assertDoesNotThrow(() -> runFuture.get(5, TimeUnit.SECONDS), 
"Processor did not stop gracefully");
-
-            
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
 consumedEvents.get());
-            assertEquals(consumedEvents.get(), 
testRunner.getCounterValue(ConsumeBoxEnterpriseEvents.COUNTER_RECORDS_PROCESSED));
-        } finally {
-            // We can't use try with resources, as Executors use a shutdown 
method
-            // which indefinitely waits for submitted tasks.
-            runExecutor.shutdownNow();
-        }
-    }
-
-    private Stream<Integer> extractEventIds(final MockFlowFile flowFile) {
-        final JsonValue json = Json.parse(flowFile.getContent());
-        return json.asArray().values().stream()
-                .map(JsonValue::asObject)
-                .map(jsonObject -> jsonObject.get("id").asString())
-                .map(Integer::parseInt);
-    }
-
-    /**
-     * This class is used to override external call in {@link 
ConsumeBoxEnterpriseEvents#getEventLog(String)}.
-     */
-    private static class TestConsumeBoxEnterpriseEvents extends 
ConsumeBoxEnterpriseEvents {
+    void testNoEventsReturned() {
+        
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_EVENT_POSITION, 
StartEventPosition.EARLIEST);
 
-        private volatile Function<String, EventLog> fakeEventLog;
+        // Set mockEvents to null so it returns emptyEvents
+        mockEvents = null;
 
-        void overrideGetEventLog(final Function<String, EventLog> 
fakeEventLog) {
-            this.fakeEventLog = fakeEventLog;
-        }
-
-        @Override
-        EventLog getEventLog(String position) {
-            return fakeEventLog.apply(position);
-        }
-    }
-
-    private static class TestEventStream {
-
-        private static final String NOW_POSITION = "now";
-
-        private final List<BoxEvent> events = new ArrayList<>();
-
-        void addEvent(final int eventId) {
-            events.add(createBoxEvent(eventId));
-        }
-
-        EventLog consume(final String position) {
-            final String nextPosition = String.valueOf(events.size());
-
-            if (NOW_POSITION.equals(position)) {
-                return createEventLog(emptyList(), nextPosition);
-            }
-
-            final int streamPosition = Integer.parseInt(position);
-            if (streamPosition > events.size()) {
-                // Real Box API returns the latest offset position, even if 
streamPosition was greater.
-                return createEventLog(emptyList(), nextPosition);
-            }
-
-            final List<BoxEvent> consumedEvents = 
events.subList(streamPosition, events.size());
-
-            return createEventLog(consumedEvents, nextPosition);
-        }
-    }
+        testRunner.run();
 
-    private static BoxEvent createBoxEvent(final int eventId) {
-        return new BoxEvent(null, "{\"event_id\": \"%d\"}".formatted(eventId));
+        testRunner.assertTransferCount(ConsumeBoxEnterpriseEvents.REL_SUCCESS, 
0);
     }
 
-    private static EventLog createEventLog(final List<BoxEvent> 
consumedEvents, final String nextPosition) {
-        // EventLog is not designed for being extended. Thus, mocking it.
-        final EventLog eventLog = mock();
-
-        when(eventLog.getNextStreamPosition()).thenReturn(nextPosition);
-        lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
-        
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
-
-        return eventLog;
+    private Event createMockEvent(String eventId, EventEventTypeField 
eventType) {
+        Event event = mock(Event.class);
+        when(event.getEventId()).thenReturn(eventId);
+        when(event.getEventType()).thenReturn(new EnumWrapper<>(eventType));
+        when(event.getCreatedAt()).thenReturn(null);
+        when(event.getSessionId()).thenReturn(null);
+        when(event.getCreatedBy()).thenReturn(null);
+        when(event.getSource()).thenReturn(null);
+        when(event.getAdditionalDetails()).thenReturn(null);
+        return event;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to