This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b529622a0 NIFI-14325 Create ConsumeBoxEnterpriseEvents processor
8b529622a0 is described below

commit 8b529622a0ca35abb36d22c8ea0725439dd0627f
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Mar 6 11:38:24 2025 +0100

    NIFI-14325 Create ConsumeBoxEnterpriseEvents processor
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #9777.
---
 .../processors/box/BoxEventJsonArrayWriter.java    | 116 ++++++++++
 .../processors/box/ConsumeBoxEnterpriseEvents.java | 251 +++++++++++++++++++++
 .../nifi/processors/box/ConsumeBoxEvents.java      |  43 +---
 .../services/org.apache.nifi.processor.Processor   |   6 +-
 .../additionalDetails.md                           |  20 ++
 .../box/BoxEventJsonArrayWriterTest.java           | 160 +++++++++++++
 .../box/ConsumeBoxEnterpriseEventsTest.java        | 150 ++++++++++++
 7 files changed, 705 insertions(+), 41 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriter.java
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriter.java
new file mode 100644
index 0000000000..52cf29c1d3
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxEvent;
+import com.eclipsesource.json.Json;
+import com.eclipsesource.json.JsonObject;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Objects;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * A class responsible for writing {@link BoxEvent} objects into a JSON array.
+ * Not thread-safe.
+ */
+final class BoxEventJsonArrayWriter implements Closeable {
+
+    private final Writer writer;
+    private boolean hasBegun;
+    private boolean hasEntries;
+    private boolean closed;
+
+    private BoxEventJsonArrayWriter(final Writer writer) {
+        this.writer = writer;
+        this.hasBegun = false;
+        this.hasEntries = false;
+        this.closed = false;
+    }
+
+    static BoxEventJsonArrayWriter create(final OutputStream outputStream) 
throws IOException {
+        final Writer writer = new OutputStreamWriter(outputStream, UTF_8);
+        return new BoxEventJsonArrayWriter(writer);
+    }
+
+    void write(final BoxEvent event) throws IOException {
+        if (closed) {
+            throw new IOException("The Writer is closed");
+        }
+
+        if (!hasBegun) {
+            beginArray();
+            hasBegun = true;
+        }
+
+        if (hasEntries) {
+            writer.write(',');
+        }
+
+        final JsonObject json = toRecord(event);
+        json.writeTo(writer);
+
+        hasEntries = true;
+    }
+
+    private JsonObject toRecord(final BoxEvent event) {
+        final JsonObject json = Json.object();
+
+        json.add("accessibleBy", event.getAccessibleBy() == null ? Json.NULL : 
Json.parse(event.getAccessibleBy().getJson()));
+        json.add("actionBy", event.getActionBy() == null ? Json.NULL : 
Json.parse(event.getActionBy().getJson()));
+        json.add("additionalDetails", 
Objects.requireNonNullElse(event.getAdditionalDetails(), Json.NULL));
+        json.add("createdAt", event.getCreatedAt() == null ? Json.NULL : 
Json.value(event.getCreatedAt().toString()));
+        json.add("createdBy", event.getCreatedBy() == null ? Json.NULL : 
Json.parse(event.getCreatedBy().getJson()));
+        json.add("eventType", event.getEventType() == null ? Json.NULL : 
Json.value(event.getEventType().name()));
+        json.add("id", Objects.requireNonNullElse(Json.value(event.getID()), 
Json.NULL));
+        json.add("ipAddress", 
Objects.requireNonNullElse(Json.value(event.getIPAddress()), Json.NULL));
+        json.add("sessionID", 
Objects.requireNonNullElse(Json.value(event.getSessionID()), Json.NULL));
+        json.add("source", Objects.requireNonNullElse(event.getSourceJSON(), 
Json.NULL));
+        json.add("typeName", 
Objects.requireNonNullElse(Json.value(event.getTypeName()), Json.NULL));
+
+        return json;
+    }
+
+    private void beginArray() throws IOException {
+        writer.write('[');
+    }
+
+    private void endArray() throws IOException {
+        writer.write(']');
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed) {
+            return;
+        }
+
+        closed = true;
+
+        if (!hasBegun) {
+            beginArray();
+        }
+        endArray();
+
+        writer.close();
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
new file mode 100644
index 0000000000..a164c9d6ef
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxAPIConnection;
+import com.box.sdk.BoxEvent;
+import com.box.sdk.EnterpriseEventsStreamRequest;
+import com.box.sdk.EventLog;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.box.controllerservices.BoxClientService;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.nifi.annotation.behavior.InputRequirement.Requirement.INPUT_FORBIDDEN;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@Tags({"box", "storage"})
+@CapabilityDescription("""
+        Consumes Enterprise Events from Box admin_logs_streaming Stream Type.
+        The content of the events is sent to the 'success' relationship as a 
JSON array.
+        The last known position of the Box stream is stored in the processor 
state and is used to
+        resume the stream from the last known position when the processor is 
restarted.
+        """)
+@SeeAlso({ ConsumeBoxEvents.class, FetchBoxFile.class, ListBoxFile.class })
+@InputRequirement(INPUT_FORBIDDEN)
+@Stateful(description = """
+        The last known position of the Box Event stream is stored in the 
processor state and is used to
+        resume the stream from the last known position when the processor is 
restarted.
+        """, scopes = { Scope.CLUSTER })
+public class ConsumeBoxEnterpriseEvents extends AbstractProcessor {
+
+    private static final String POSITION_KEY = "position";
+    private static final String EARLIEST_POSITION = "0";
+    private static final String LATEST_POSITION = "now";
+
+    private static final int LIMIT = 500;
+
+    static final PropertyDescriptor EVENT_TYPES = new 
PropertyDescriptor.Builder()
+            .name("Event Types")
+            .description("A comma separated list of Enterprise Events to 
consume. If not set, all Events are consumed." +
+                    "See Additional Details for more information.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor START_EVENT_POSITION = new 
PropertyDescriptor.Builder()
+            .name("Start Event Position")
+            .description("What position to consume the Events from.")
+            .required(true)
+            .allowableValues(StartEventPosition.class)
+            .defaultValue(StartEventPosition.EARLIEST)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    static final PropertyDescriptor START_OFFSET = new 
PropertyDescriptor.Builder()
+            .name("Start Offset")
+            .description("The offset to start consuming the Events from.")
+            .required(true)
+            .dependsOn(START_EVENT_POSITION, StartEventPosition.OFFSET)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .build();
+
+    private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = 
List.of(
+            BoxClientService.BOX_CLIENT_SERVICE,
+            EVENT_TYPES,
+            START_EVENT_POSITION,
+            START_OFFSET
+    );
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Events received successfully will be sent out this 
relationship.")
+            .build();
+
+    private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    private volatile BoxAPIConnection boxAPIConnection;
+    private volatile String[] eventTypes;
+    private volatile String streamPosition;
+
+    @OnScheduled
+    public void onEnabled(final ProcessContext context) {
+        final BoxClientService boxClientService = 
context.getProperty(BoxClientService.BOX_CLIENT_SERVICE).asControllerService(BoxClientService.class);
+        boxAPIConnection = boxClientService.getBoxApiConnection();
+
+        eventTypes = context.getProperty(EVENT_TYPES).isSet()
+                ? context.getProperty(EVENT_TYPES).getValue().split(",")
+                : new String[0];
+
+        streamPosition = calculateStreamPosition(context);
+    }
+
+    private String calculateStreamPosition(final ProcessContext context) {
+        return readStreamPosition(context)
+                .orElseGet(() -> initializeStartEventPosition(context));
+    }
+
+    private Optional<String> readStreamPosition(final ProcessContext context) {
+        try {
+            final String position = 
context.getStateManager().getState(Scope.CLUSTER).get(POSITION_KEY);
+            return Optional.ofNullable(position);
+        } catch (final IOException e) {
+            throw new ProcessException("Could not retrieve saved event 
position", e);
+        }
+    }
+
+    private void writeStreamPosition(final String position, final 
ProcessSession session) {
+        try {
+            final Map<String, String> stateMap = Map.of(POSITION_KEY, 
position);
+            session.setState(stateMap, Scope.CLUSTER);
+        } catch (final IOException e) {
+            throw new ProcessException("Could not save event position", e);
+        }
+    }
+
+    private String initializeStartEventPosition(final ProcessContext context) {
+        final StartEventPosition startEventPosition = 
context.getProperty(START_EVENT_POSITION).asAllowableValue(StartEventPosition.class);
+        return switch (startEventPosition) {
+            case EARLIEST -> EARLIEST_POSITION;
+            case LATEST -> retrieveLatestStreamPosition();
+            case OFFSET -> context.getProperty(START_OFFSET).getValue();
+        };
+    }
+
+    private String retrieveLatestStreamPosition() {
+        final EventLog eventLog = getEventLog(LATEST_POSITION);
+        return eventLog.getNextStreamPosition();
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        while (true) {
+            final EventLog eventLog = getEventLog(streamPosition);
+            streamPosition = eventLog.getNextStreamPosition();
+            writeStreamPosition(streamPosition, session);
+
+            if (eventLog.getSize() == 0) {
+                return;
+            }
+
+            writeLogAsRecords(eventLog, session);
+        }
+    }
+
+    // Package-private for testing.
+    EventLog getEventLog(final String position) {
+        final EnterpriseEventsStreamRequest request = new 
EnterpriseEventsStreamRequest()
+                .limit(LIMIT)
+                .position(position)
+                .typeNames(eventTypes);
+
+        return EventLog.getEnterpriseEventsStream(boxAPIConnection, request);
+    }
+
+    private void writeLogAsRecords(final EventLog eventLog, final 
ProcessSession session) {
+        final FlowFile flowFile = session.create();
+
+        try (final OutputStream out = session.write(flowFile);
+             final BoxEventJsonArrayWriter writer = 
BoxEventJsonArrayWriter.create(out)) {
+
+            for (final BoxEvent event : eventLog) {
+                writer.write(event);
+            }
+        } catch (final IOException e) {
+            throw new ProcessException("Failed to write Box Event into a 
FlowFile", e);
+        }
+
+        session.putAttribute(flowFile, "record.count", 
String.valueOf(eventLog.getSize()));
+        session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), 
"application/json");
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    public enum StartEventPosition implements DescribedValue {
+        EARLIEST("earliest", "Start consuming events from the earliest 
available Event."),
+        LATEST("latest", "Start consuming events from the latest Event."),
+        OFFSET("offset", "Start consuming events from the specified offset.");
+
+        private final String value;
+        private final String description;
+
+        StartEventPosition(final String value, final String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String getValue() {
+            return value;
+        }
+
+        @Override
+        public String getDisplayName() {
+            return value;
+        }
+
+        @Override
+        public String getDescription() {
+            return description;
+        }
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java
index 679941159c..21f674c40d 100644
--- 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java
@@ -20,8 +20,6 @@ import com.box.sdk.BoxAPIConnection;
 import com.box.sdk.BoxEvent;
 import com.box.sdk.EventListener;
 import com.box.sdk.EventStream;
-import com.eclipsesource.json.Json;
-import com.eclipsesource.json.JsonObject;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -50,14 +48,9 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -223,20 +216,11 @@ public class ConsumeBoxEvents extends AbstractProcessor 
implements VerifiablePro
         final List<BoxEvent> boxEvents = new ArrayList<>();
         final int recordCount = events.drainTo(boxEvents);
 
-        try (final OutputStream out = session.write(flowFile)) {
-            final Writer writer = new OutputStreamWriter(out, 
StandardCharsets.UTF_8);
-            writer.write("[");
-            final Iterator<BoxEvent> iterator = boxEvents.iterator();
-            while (iterator.hasNext()) {
-                BoxEvent event = iterator.next();
-                JsonObject jsonEvent = toRecord(event);
-                jsonEvent.writeTo(writer);
-                if (iterator.hasNext()) {
-                    writer.write(",");
-                }
+        try (final OutputStream out = session.write(flowFile);
+             final BoxEventJsonArrayWriter writer = 
BoxEventJsonArrayWriter.create(out)) {
+            for (BoxEvent event : boxEvents) {
+                writer.write(event);
             }
-            writer.write("]");
-            writer.flush();
         } catch (Exception e) {
             getLogger().error("Failed to write events to FlowFile; will 
re-queue events and try again", e);
             boxEvents.forEach(events::offer);
@@ -249,23 +233,4 @@ public class ConsumeBoxEvents extends AbstractProcessor 
implements VerifiablePro
         session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), 
"application/json");
         session.transfer(flowFile, REL_SUCCESS);
     }
-
-    private JsonObject toRecord(BoxEvent event) {
-        JsonObject json = Json.object();
-
-        json.add("accessibleBy", event.getAccessibleBy() == null ? Json.NULL : 
Json.parse(event.getAccessibleBy().getJson()));
-        json.add("actionBy", event.getActionBy() == null ? Json.NULL : 
Json.parse(event.getActionBy().getJson()));
-        json.add("additionalDetails", 
Objects.requireNonNullElse(event.getAdditionalDetails(), Json.NULL));
-        json.add("createdAt", event.getCreatedAt() == null ? Json.NULL : 
Json.value(event.getCreatedAt().toString()));
-        json.add("createdBy", event.getCreatedBy() == null ? Json.NULL : 
Json.parse(event.getCreatedBy().getJson()));
-        json.add("eventType", event.getEventType() == null ? Json.NULL : 
Json.value(event.getEventType().name()));
-        json.add("id", Objects.requireNonNullElse(Json.value(event.getID()), 
Json.NULL));
-        json.add("ipAddress", 
Objects.requireNonNullElse(Json.value(event.getIPAddress()), Json.NULL));
-        json.add("sessionID", 
Objects.requireNonNullElse(Json.value(event.getSessionID()), Json.NULL));
-        json.add("source", Objects.requireNonNullElse(event.getSourceJSON(), 
Json.NULL));
-        json.add("typeName", 
Objects.requireNonNullElse(Json.value(event.getTypeName()), Json.NULL));
-
-        return json;
-    }
-
 }
diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index bf46003c94..b4dc7216ad 100644
--- 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,10 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents
+org.apache.nifi.processors.box.ConsumeBoxEvents
 org.apache.nifi.processors.box.GetBoxFileCollaborators
-org.apache.nifi.processors.box.ListBoxFile
 org.apache.nifi.processors.box.FetchBoxFile
 org.apache.nifi.processors.box.FetchBoxFileInfo
 org.apache.nifi.processors.box.FetchBoxFileRepresentation
+org.apache.nifi.processors.box.ListBoxFile
 org.apache.nifi.processors.box.PutBoxFile
-org.apache.nifi.processors.box.ConsumeBoxEvents
diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/docs/org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents/additionalDetails.md
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/docs/org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents/additionalDetails.md
new file mode 100644
index 0000000000..8f89c6189c
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/docs/org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents/additionalDetails.md
@@ -0,0 +1,20 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# ConsumeBoxEnterpriseEvents
+
+## Box Documentation
+
+[Event Types 
List](https://developer.box.com/guides/events/enterprise-events/for-enterprise/#event-types).
diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriterTest.java
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriterTest.java
new file mode 100644
index 0000000000..f971adcf78
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxEvent;
+import com.eclipsesource.json.Json;
+import com.eclipsesource.json.JsonObject;
+import com.eclipsesource.json.JsonValue;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class BoxEventJsonArrayWriterTest {
+
+    private ByteArrayOutputStream out;
+    private BoxEventJsonArrayWriter writer;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        out = new ByteArrayOutputStream();
+        writer = BoxEventJsonArrayWriter.create(out);
+    }
+
+    @Test
+    void writeNoEvents() throws IOException {
+        writer.close();
+
+        assertEquals(Json.array(), actualJson());
+    }
+
+    @Test
+    void writeSingleEvent() throws IOException {
+        final BoxEvent event = new BoxEvent(null, """
+                {
+                    "event_id": "1",
+                    "event_type": "ITEM_CREATE"
+                }
+                """);
+
+        writer.write(event);
+        writer.close();
+
+        final JsonValue expected = Json.array().add(
+                createEventJson()
+                        .set("id", event.getID())
+                        .set("eventType", event.getEventType().name())
+                        .set("typeName", event.getTypeName())
+        );
+
+        assertEquals(expected, actualJson());
+    }
+
+    @Test
+    void writeMultipleEvents() throws IOException {
+        final BoxEvent event1 = new BoxEvent(null, """
+                {
+                    "event_id": "1",
+                    "event_type": "ITEM_CREATE",
+                    "source": {
+                        "item_type": "file",
+                        "item_id": "123"
+                    }
+                }
+                """);
+        final BoxEvent event2 = new BoxEvent(null, """
+                {
+                    "event_id": "2",
+                    "event_type": "GROUP_ADD_USER",
+                    "source": {
+                        "type": "group",
+                        "id": "123"
+                    }
+                }
+                """);
+        final BoxEvent event3 = new BoxEvent(null, """
+                {
+                    "event_id": "3",
+                    "event_type": "COLLABORATION_ACCEPT",
+                    "source": {
+                        "item_type": "file",
+                        "item_id": "123"
+                    }
+                }
+                """);
+
+        writer.write(event1);
+        writer.write(event2);
+        writer.write(event3);
+        writer.close();
+
+        final JsonValue expected = Json.array()
+                .add(createEventJson()
+                        .set("id", event1.getID())
+                        .set("eventType", event1.getEventType().name())
+                        .set("typeName", event1.getTypeName())
+                        .set("source", Json.object()
+                                .add("item_type", 
event1.getSourceJSON().get("item_type"))
+                                .add("item_id", 
event1.getSourceJSON().get("item_id"))
+                        )
+                )
+                .add(createEventJson()
+                        .set("id", event2.getID())
+                        .set("eventType", event2.getEventType().name())
+                        .set("typeName", event2.getTypeName())
+                        .set("source", Json.object()
+                                .add("type", 
event2.getSourceJSON().get("type"))
+                                .add("id", event2.getSourceInfo().getID())
+                        )
+                )
+                .add(createEventJson()
+                        .set("id", event3.getID())
+                        .set("eventType", event3.getEventType().name())
+                        .set("typeName", event3.getTypeName())
+                        .set("source", Json.object()
+                                .add("item_type", 
event3.getSourceJSON().get("item_type"))
+                                .add("item_id", 
event3.getSourceJSON().get("item_id"))
+                        )
+                );
+
+        assertEquals(expected, actualJson());
+    }
+
+    private JsonValue actualJson() {
+        return Json.parse(out.toString());
+    }
+
+    private JsonObject createEventJson() {
+        // The Writer explicitly writes nulls as JSON null values.
+        return Json.object()
+                .add("accessibleBy", Json.NULL)
+                .add("actionBy", Json.NULL)
+                .add("additionalDetails", Json.NULL)
+                .add("createdAt", Json.NULL)
+                .add("createdBy", Json.NULL)
+                .add("eventType", Json.NULL)
+                .add("id", Json.NULL)
+                .add("ipAddress", Json.NULL)
+                .add("sessionID", Json.NULL)
+                .add("source", Json.NULL)
+                .add("typeName", Json.NULL);
+    }
+}
diff --git 
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
new file mode 100644
index 0000000000..69fc459ad0
--- /dev/null
+++ 
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxEvent;
+import com.box.sdk.EventLog;
+import com.eclipsesource.json.Json;
+import com.eclipsesource.json.JsonValue;
+import 
org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents.StartEventPosition;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class ConsumeBoxEnterpriseEventsTest extends AbstractBoxFileTest {
+
+    private TestEventStream eventStream;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        eventStream = new TestEventStream();
+
+        final ConsumeBoxEnterpriseEvents processor = new 
ConsumeBoxEnterpriseEvents() {
+            @Override
+            EventLog getEventLog(String position) {
+                return eventStream.consume(position);
+            }
+        };
+
+        testRunner = TestRunners.newTestRunner(processor);
+        super.setUp();
+    }
+
+    @ParameterizedTest
+    @MethodSource("dataFor_testConsumeEvents")
+    void testConsumeEvents(
+            final StartEventPosition startEventPosition,
+            final @Nullable String startOffset,
+            final int expectedFlowFiles,
+            final List<Integer> expectedEventIds) {
+        
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_EVENT_POSITION, 
startEventPosition);
+        if (startOffset != null) {
+            testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_OFFSET, 
startOffset);
+        }
+
+        eventStream.addEvent(0);
+        eventStream.addEvent(1);
+        eventStream.addEvent(2);
+        testRunner.run();
+
+        eventStream.addEvent(3);
+        testRunner.run();
+
+        
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
 expectedFlowFiles);
+
+        final List<Integer> eventIds = 
testRunner.getFlowFilesForRelationship(ConsumeBoxEnterpriseEvents.REL_SUCCESS).stream()
+                .flatMap(this::extractEventIds)
+                .toList();
+
+        assertEquals(expectedEventIds, eventIds);
+    }
+
+    static List<Arguments> dataFor_testConsumeEvents() {
+        return List.of(
+                arguments(StartEventPosition.EARLIEST, null, 2, List.of(0, 1, 
2, 3)),
+                arguments(StartEventPosition.OFFSET, "1", 2, List.of(1, 2, 3)),
+                arguments(StartEventPosition.OFFSET, "12345", 1, List.of(3)),
+                arguments(StartEventPosition.LATEST, null, 1, List.of(3))
+        );
+    }
+
+    private Stream<Integer> extractEventIds(final MockFlowFile flowFile) {
+        final JsonValue json = Json.parse(flowFile.getContent());
+        return json.asArray().values().stream()
+                .map(JsonValue::asObject)
+                .map(jsonObject -> jsonObject.get("id").asString())
+                .map(Integer::parseInt);
+    }
+
+    private static class TestEventStream {
+
+        private static final String NOW_POSITION = "now";
+
+        private final List<BoxEvent> events = new ArrayList<>();
+
+        void addEvent(final int eventId) {
+            final BoxEvent boxEvent = new BoxEvent(null, "{\"event_id\": 
\"%d\"}".formatted(eventId));
+            events.add(boxEvent);
+        }
+
+        EventLog consume(final String position) {
+            if (NOW_POSITION.equals(position)) {
+                return createEmptyEventLog();
+            }
+
+            final int streamPosition = Integer.parseInt(position);
+            if (streamPosition > events.size()) {
+                // Real Box API returns the latest offset position, even if 
streamPosition was greater.
+                return createEmptyEventLog();
+            }
+
+            final List<BoxEvent> consumedEvents = 
events.subList(streamPosition, events.size());
+
+            return createEventLog(consumedEvents);
+        }
+
+        private EventLog createEmptyEventLog() {
+            return createEventLog(emptyList());
+        }
+
+        private EventLog createEventLog(final List<BoxEvent> consumedEvents) {
+            // EventLog is not designed for being extended. Thus, mocking it.
+            final EventLog eventLog = mock();
+
+            
when(eventLog.getNextStreamPosition()).thenReturn(String.valueOf(events.size()));
+            
lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
+            
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
+
+            return eventLog;
+        }
+    }
+}


Reply via email to