This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8b529622a0 NIFI-14325 Create ConsumeBoxEnterpriseEvents processor
8b529622a0 is described below
commit 8b529622a0ca35abb36d22c8ea0725439dd0627f
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Thu Mar 6 11:38:24 2025 +0100
NIFI-14325 Create ConsumeBoxEnterpriseEvents processor
Signed-off-by: Pierre Villard <[email protected]>
This closes #9777.
---
.../processors/box/BoxEventJsonArrayWriter.java | 116 ++++++++++
.../processors/box/ConsumeBoxEnterpriseEvents.java | 251 +++++++++++++++++++++
.../nifi/processors/box/ConsumeBoxEvents.java | 43 +---
.../services/org.apache.nifi.processor.Processor | 6 +-
.../additionalDetails.md | 20 ++
.../box/BoxEventJsonArrayWriterTest.java | 160 +++++++++++++
.../box/ConsumeBoxEnterpriseEventsTest.java | 150 ++++++++++++
7 files changed, 705 insertions(+), 41 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriter.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriter.java
new file mode 100644
index 0000000000..52cf29c1d3
--- /dev/null
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriter.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxEvent;
+import com.eclipsesource.json.Json;
+import com.eclipsesource.json.JsonObject;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Objects;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+/**
+ * A class responsible for writing {@link BoxEvent} objects into a JSON array.
+ * Not thread-safe.
+ */
+final class BoxEventJsonArrayWriter implements Closeable {
+
+ private final Writer writer;
+ private boolean hasBegun;
+ private boolean hasEntries;
+ private boolean closed;
+
+ private BoxEventJsonArrayWriter(final Writer writer) {
+ this.writer = writer;
+ this.hasBegun = false;
+ this.hasEntries = false;
+ this.closed = false;
+ }
+
+ static BoxEventJsonArrayWriter create(final OutputStream outputStream)
throws IOException {
+ final Writer writer = new OutputStreamWriter(outputStream, UTF_8);
+ return new BoxEventJsonArrayWriter(writer);
+ }
+
+ void write(final BoxEvent event) throws IOException {
+ if (closed) {
+ throw new IOException("The Writer is closed");
+ }
+
+ if (!hasBegun) {
+ beginArray();
+ hasBegun = true;
+ }
+
+ if (hasEntries) {
+ writer.write(',');
+ }
+
+ final JsonObject json = toRecord(event);
+ json.writeTo(writer);
+
+ hasEntries = true;
+ }
+
+ private JsonObject toRecord(final BoxEvent event) {
+ final JsonObject json = Json.object();
+
+ json.add("accessibleBy", event.getAccessibleBy() == null ? Json.NULL :
Json.parse(event.getAccessibleBy().getJson()));
+ json.add("actionBy", event.getActionBy() == null ? Json.NULL :
Json.parse(event.getActionBy().getJson()));
+ json.add("additionalDetails",
Objects.requireNonNullElse(event.getAdditionalDetails(), Json.NULL));
+ json.add("createdAt", event.getCreatedAt() == null ? Json.NULL :
Json.value(event.getCreatedAt().toString()));
+ json.add("createdBy", event.getCreatedBy() == null ? Json.NULL :
Json.parse(event.getCreatedBy().getJson()));
+ json.add("eventType", event.getEventType() == null ? Json.NULL :
Json.value(event.getEventType().name()));
+ json.add("id", Objects.requireNonNullElse(Json.value(event.getID()),
Json.NULL));
+ json.add("ipAddress",
Objects.requireNonNullElse(Json.value(event.getIPAddress()), Json.NULL));
+ json.add("sessionID",
Objects.requireNonNullElse(Json.value(event.getSessionID()), Json.NULL));
+ json.add("source", Objects.requireNonNullElse(event.getSourceJSON(),
Json.NULL));
+ json.add("typeName",
Objects.requireNonNullElse(Json.value(event.getTypeName()), Json.NULL));
+
+ return json;
+ }
+
+ private void beginArray() throws IOException {
+ writer.write('[');
+ }
+
+ private void endArray() throws IOException {
+ writer.write(']');
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
+
+ if (!hasBegun) {
+ beginArray();
+ }
+ endArray();
+
+ writer.close();
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
new file mode 100644
index 0000000000..a164c9d6ef
--- /dev/null
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEvents.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxAPIConnection;
+import com.box.sdk.BoxEvent;
+import com.box.sdk.EnterpriseEventsStreamRequest;
+import com.box.sdk.EventLog;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.box.controllerservices.BoxClientService;
+import org.apache.nifi.components.DescribedValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static
org.apache.nifi.annotation.behavior.InputRequirement.Requirement.INPUT_FORBIDDEN;
+
+@PrimaryNodeOnly
+@TriggerSerially
+@Tags({"box", "storage"})
+@CapabilityDescription("""
+ Consumes Enterprise Events from Box admin_logs_streaming Stream Type.
+ The content of the events is sent to the 'success' relationship as a
JSON array.
+ The last known position of the Box stream is stored in the processor
state and is used to
+ resume the stream from the last known position when the processor is
restarted.
+ """)
+@SeeAlso({ ConsumeBoxEvents.class, FetchBoxFile.class, ListBoxFile.class })
+@InputRequirement(INPUT_FORBIDDEN)
+@Stateful(description = """
+ The last known position of the Box Event stream is stored in the
processor state and is used to
+ resume the stream from the last known position when the processor is
restarted.
+ """, scopes = { Scope.CLUSTER })
+public class ConsumeBoxEnterpriseEvents extends AbstractProcessor {
+
+ private static final String POSITION_KEY = "position";
+ private static final String EARLIEST_POSITION = "0";
+ private static final String LATEST_POSITION = "now";
+
+ private static final int LIMIT = 500;
+
+ static final PropertyDescriptor EVENT_TYPES = new
PropertyDescriptor.Builder()
+ .name("Event Types")
+ .description("A comma separated list of Enterprise Events to
consume. If not set, all Events are consumed." +
+ "See Additional Details for more information.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor START_EVENT_POSITION = new
PropertyDescriptor.Builder()
+ .name("Start Event Position")
+ .description("What position to consume the Events from.")
+ .required(true)
+ .allowableValues(StartEventPosition.class)
+ .defaultValue(StartEventPosition.EARLIEST)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ static final PropertyDescriptor START_OFFSET = new
PropertyDescriptor.Builder()
+ .name("Start Offset")
+ .description("The offset to start consuming the Events from.")
+ .required(true)
+ .dependsOn(START_EVENT_POSITION, StartEventPosition.OFFSET)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ BoxClientService.BOX_CLIENT_SERVICE,
+ EVENT_TYPES,
+ START_EVENT_POSITION,
+ START_OFFSET
+ );
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Events received successfully will be sent out this
relationship.")
+ .build();
+
+ private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ private volatile BoxAPIConnection boxAPIConnection;
+ private volatile String[] eventTypes;
+ private volatile String streamPosition;
+
+ @OnScheduled
+ public void onEnabled(final ProcessContext context) {
+ final BoxClientService boxClientService =
context.getProperty(BoxClientService.BOX_CLIENT_SERVICE).asControllerService(BoxClientService.class);
+ boxAPIConnection = boxClientService.getBoxApiConnection();
+
+ eventTypes = context.getProperty(EVENT_TYPES).isSet()
+ ? context.getProperty(EVENT_TYPES).getValue().split(",")
+ : new String[0];
+
+ streamPosition = calculateStreamPosition(context);
+ }
+
+ private String calculateStreamPosition(final ProcessContext context) {
+ return readStreamPosition(context)
+ .orElseGet(() -> initializeStartEventPosition(context));
+ }
+
+ private Optional<String> readStreamPosition(final ProcessContext context) {
+ try {
+ final String position =
context.getStateManager().getState(Scope.CLUSTER).get(POSITION_KEY);
+ return Optional.ofNullable(position);
+ } catch (final IOException e) {
+ throw new ProcessException("Could not retrieve saved event
position", e);
+ }
+ }
+
+ private void writeStreamPosition(final String position, final
ProcessSession session) {
+ try {
+ final Map<String, String> stateMap = Map.of(POSITION_KEY,
position);
+ session.setState(stateMap, Scope.CLUSTER);
+ } catch (final IOException e) {
+ throw new ProcessException("Could not save event position", e);
+ }
+ }
+
+ private String initializeStartEventPosition(final ProcessContext context) {
+ final StartEventPosition startEventPosition =
context.getProperty(START_EVENT_POSITION).asAllowableValue(StartEventPosition.class);
+ return switch (startEventPosition) {
+ case EARLIEST -> EARLIEST_POSITION;
+ case LATEST -> retrieveLatestStreamPosition();
+ case OFFSET -> context.getProperty(START_OFFSET).getValue();
+ };
+ }
+
+ private String retrieveLatestStreamPosition() {
+ final EventLog eventLog = getEventLog(LATEST_POSITION);
+ return eventLog.getNextStreamPosition();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ while (true) {
+ final EventLog eventLog = getEventLog(streamPosition);
+ streamPosition = eventLog.getNextStreamPosition();
+ writeStreamPosition(streamPosition, session);
+
+ if (eventLog.getSize() == 0) {
+ return;
+ }
+
+ writeLogAsRecords(eventLog, session);
+ }
+ }
+
+ // Package-private for testing.
+ EventLog getEventLog(final String position) {
+ final EnterpriseEventsStreamRequest request = new
EnterpriseEventsStreamRequest()
+ .limit(LIMIT)
+ .position(position)
+ .typeNames(eventTypes);
+
+ return EventLog.getEnterpriseEventsStream(boxAPIConnection, request);
+ }
+
+ private void writeLogAsRecords(final EventLog eventLog, final
ProcessSession session) {
+ final FlowFile flowFile = session.create();
+
+ try (final OutputStream out = session.write(flowFile);
+ final BoxEventJsonArrayWriter writer =
BoxEventJsonArrayWriter.create(out)) {
+
+ for (final BoxEvent event : eventLog) {
+ writer.write(event);
+ }
+ } catch (final IOException e) {
+ throw new ProcessException("Failed to write Box Event into a
FlowFile", e);
+ }
+
+ session.putAttribute(flowFile, "record.count",
String.valueOf(eventLog.getSize()));
+ session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(),
"application/json");
+ session.transfer(flowFile, REL_SUCCESS);
+ }
+
+ public enum StartEventPosition implements DescribedValue {
+ EARLIEST("earliest", "Start consuming events from the earliest
available Event."),
+ LATEST("latest", "Start consuming events from the latest Event."),
+ OFFSET("offset", "Start consuming events from the specified offset.");
+
+ private final String value;
+ private final String description;
+
+ StartEventPosition(final String value, final String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String getDisplayName() {
+ return value;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java
index 679941159c..21f674c40d 100644
---
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/ConsumeBoxEvents.java
@@ -20,8 +20,6 @@ import com.box.sdk.BoxAPIConnection;
import com.box.sdk.BoxEvent;
import com.box.sdk.EventListener;
import com.box.sdk.EventStream;
-import com.eclipsesource.json.Json;
-import com.eclipsesource.json.JsonObject;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@@ -50,14 +48,9 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -223,20 +216,11 @@ public class ConsumeBoxEvents extends AbstractProcessor
implements VerifiablePro
final List<BoxEvent> boxEvents = new ArrayList<>();
final int recordCount = events.drainTo(boxEvents);
- try (final OutputStream out = session.write(flowFile)) {
- final Writer writer = new OutputStreamWriter(out,
StandardCharsets.UTF_8);
- writer.write("[");
- final Iterator<BoxEvent> iterator = boxEvents.iterator();
- while (iterator.hasNext()) {
- BoxEvent event = iterator.next();
- JsonObject jsonEvent = toRecord(event);
- jsonEvent.writeTo(writer);
- if (iterator.hasNext()) {
- writer.write(",");
- }
+ try (final OutputStream out = session.write(flowFile);
+ final BoxEventJsonArrayWriter writer =
BoxEventJsonArrayWriter.create(out)) {
+ for (BoxEvent event : boxEvents) {
+ writer.write(event);
}
- writer.write("]");
- writer.flush();
} catch (Exception e) {
getLogger().error("Failed to write events to FlowFile; will
re-queue events and try again", e);
boxEvents.forEach(events::offer);
@@ -249,23 +233,4 @@ public class ConsumeBoxEvents extends AbstractProcessor
implements VerifiablePro
session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(),
"application/json");
session.transfer(flowFile, REL_SUCCESS);
}
-
- private JsonObject toRecord(BoxEvent event) {
- JsonObject json = Json.object();
-
- json.add("accessibleBy", event.getAccessibleBy() == null ? Json.NULL :
Json.parse(event.getAccessibleBy().getJson()));
- json.add("actionBy", event.getActionBy() == null ? Json.NULL :
Json.parse(event.getActionBy().getJson()));
- json.add("additionalDetails",
Objects.requireNonNullElse(event.getAdditionalDetails(), Json.NULL));
- json.add("createdAt", event.getCreatedAt() == null ? Json.NULL :
Json.value(event.getCreatedAt().toString()));
- json.add("createdBy", event.getCreatedBy() == null ? Json.NULL :
Json.parse(event.getCreatedBy().getJson()));
- json.add("eventType", event.getEventType() == null ? Json.NULL :
Json.value(event.getEventType().name()));
- json.add("id", Objects.requireNonNullElse(Json.value(event.getID()),
Json.NULL));
- json.add("ipAddress",
Objects.requireNonNullElse(Json.value(event.getIPAddress()), Json.NULL));
- json.add("sessionID",
Objects.requireNonNullElse(Json.value(event.getSessionID()), Json.NULL));
- json.add("source", Objects.requireNonNullElse(event.getSourceJSON(),
Json.NULL));
- json.add("typeName",
Objects.requireNonNullElse(Json.value(event.getTypeName()), Json.NULL));
-
- return json;
- }
-
}
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index bf46003c94..b4dc7216ad 100644
---
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,10 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents
+org.apache.nifi.processors.box.ConsumeBoxEvents
org.apache.nifi.processors.box.GetBoxFileCollaborators
-org.apache.nifi.processors.box.ListBoxFile
org.apache.nifi.processors.box.FetchBoxFile
org.apache.nifi.processors.box.FetchBoxFileInfo
org.apache.nifi.processors.box.FetchBoxFileRepresentation
+org.apache.nifi.processors.box.ListBoxFile
org.apache.nifi.processors.box.PutBoxFile
-org.apache.nifi.processors.box.ConsumeBoxEvents
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/docs/org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents/additionalDetails.md
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/docs/org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents/additionalDetails.md
new file mode 100644
index 0000000000..8f89c6189c
--- /dev/null
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/docs/org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents/additionalDetails.md
@@ -0,0 +1,20 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# ConsumeBoxEnterpriseEvents
+
+## Box Documentation
+
+[Event Types
List](https://developer.box.com/guides/events/enterprise-events/for-enterprise/#event-types).
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriterTest.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriterTest.java
new file mode 100644
index 0000000000..f971adcf78
--- /dev/null
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/BoxEventJsonArrayWriterTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxEvent;
+import com.eclipsesource.json.Json;
+import com.eclipsesource.json.JsonObject;
+import com.eclipsesource.json.JsonValue;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class BoxEventJsonArrayWriterTest {
+
+ private ByteArrayOutputStream out;
+ private BoxEventJsonArrayWriter writer;
+
+ @BeforeEach
+ void setUp() throws IOException {
+ out = new ByteArrayOutputStream();
+ writer = BoxEventJsonArrayWriter.create(out);
+ }
+
+ @Test
+ void writeNoEvents() throws IOException {
+ writer.close();
+
+ assertEquals(Json.array(), actualJson());
+ }
+
+ @Test
+ void writeSingleEvent() throws IOException {
+ final BoxEvent event = new BoxEvent(null, """
+ {
+ "event_id": "1",
+ "event_type": "ITEM_CREATE"
+ }
+ """);
+
+ writer.write(event);
+ writer.close();
+
+ final JsonValue expected = Json.array().add(
+ createEventJson()
+ .set("id", event.getID())
+ .set("eventType", event.getEventType().name())
+ .set("typeName", event.getTypeName())
+ );
+
+ assertEquals(expected, actualJson());
+ }
+
+ @Test
+ void writeMultipleEvents() throws IOException {
+ final BoxEvent event1 = new BoxEvent(null, """
+ {
+ "event_id": "1",
+ "event_type": "ITEM_CREATE",
+ "source": {
+ "item_type": "file",
+ "item_id": "123"
+ }
+ }
+ """);
+ final BoxEvent event2 = new BoxEvent(null, """
+ {
+ "event_id": "2",
+ "event_type": "GROUP_ADD_USER",
+ "source": {
+ "type": "group",
+ "id": "123"
+ }
+ }
+ """);
+ final BoxEvent event3 = new BoxEvent(null, """
+ {
+ "event_id": "3",
+ "event_type": "COLLABORATION_ACCEPT",
+ "source": {
+ "item_type": "file",
+ "item_id": "123"
+ }
+ }
+ """);
+
+ writer.write(event1);
+ writer.write(event2);
+ writer.write(event3);
+ writer.close();
+
+ final JsonValue expected = Json.array()
+ .add(createEventJson()
+ .set("id", event1.getID())
+ .set("eventType", event1.getEventType().name())
+ .set("typeName", event1.getTypeName())
+ .set("source", Json.object()
+ .add("item_type",
event1.getSourceJSON().get("item_type"))
+ .add("item_id",
event1.getSourceJSON().get("item_id"))
+ )
+ )
+ .add(createEventJson()
+ .set("id", event2.getID())
+ .set("eventType", event2.getEventType().name())
+ .set("typeName", event2.getTypeName())
+ .set("source", Json.object()
+ .add("type",
event2.getSourceJSON().get("type"))
+ .add("id", event2.getSourceInfo().getID())
+ )
+ )
+ .add(createEventJson()
+ .set("id", event3.getID())
+ .set("eventType", event3.getEventType().name())
+ .set("typeName", event3.getTypeName())
+ .set("source", Json.object()
+ .add("item_type",
event3.getSourceJSON().get("item_type"))
+ .add("item_id",
event3.getSourceJSON().get("item_id"))
+ )
+ );
+
+ assertEquals(expected, actualJson());
+ }
+
+ private JsonValue actualJson() {
+ return Json.parse(out.toString());
+ }
+
+ private JsonObject createEventJson() {
+ // The Writer explicitly writes nulls as JSON null values.
+ return Json.object()
+ .add("accessibleBy", Json.NULL)
+ .add("actionBy", Json.NULL)
+ .add("additionalDetails", Json.NULL)
+ .add("createdAt", Json.NULL)
+ .add("createdBy", Json.NULL)
+ .add("eventType", Json.NULL)
+ .add("id", Json.NULL)
+ .add("ipAddress", Json.NULL)
+ .add("sessionID", Json.NULL)
+ .add("source", Json.NULL)
+ .add("typeName", Json.NULL);
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
new file mode 100644
index 0000000000..69fc459ad0
--- /dev/null
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/ConsumeBoxEnterpriseEventsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxEvent;
+import com.box.sdk.EventLog;
+import com.eclipsesource.json.Json;
+import com.eclipsesource.json.JsonValue;
+import
org.apache.nifi.processors.box.ConsumeBoxEnterpriseEvents.StartEventPosition;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class ConsumeBoxEnterpriseEventsTest extends AbstractBoxFileTest {
+
+ private TestEventStream eventStream;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ eventStream = new TestEventStream();
+
+ final ConsumeBoxEnterpriseEvents processor = new
ConsumeBoxEnterpriseEvents() {
+ @Override
+ EventLog getEventLog(String position) {
+ return eventStream.consume(position);
+ }
+ };
+
+ testRunner = TestRunners.newTestRunner(processor);
+ super.setUp();
+ }
+
+ @ParameterizedTest
+ @MethodSource("dataFor_testConsumeEvents")
+ void testConsumeEvents(
+ final StartEventPosition startEventPosition,
+ final @Nullable String startOffset,
+ final int expectedFlowFiles,
+ final List<Integer> expectedEventIds) {
+
testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_EVENT_POSITION,
startEventPosition);
+ if (startOffset != null) {
+ testRunner.setProperty(ConsumeBoxEnterpriseEvents.START_OFFSET,
startOffset);
+ }
+
+ eventStream.addEvent(0);
+ eventStream.addEvent(1);
+ eventStream.addEvent(2);
+ testRunner.run();
+
+ eventStream.addEvent(3);
+ testRunner.run();
+
+
testRunner.assertAllFlowFilesTransferred(ConsumeBoxEnterpriseEvents.REL_SUCCESS,
expectedFlowFiles);
+
+ final List<Integer> eventIds =
testRunner.getFlowFilesForRelationship(ConsumeBoxEnterpriseEvents.REL_SUCCESS).stream()
+ .flatMap(this::extractEventIds)
+ .toList();
+
+ assertEquals(expectedEventIds, eventIds);
+ }
+
+ static List<Arguments> dataFor_testConsumeEvents() {
+ return List.of(
+ arguments(StartEventPosition.EARLIEST, null, 2, List.of(0, 1,
2, 3)),
+ arguments(StartEventPosition.OFFSET, "1", 2, List.of(1, 2, 3)),
+ arguments(StartEventPosition.OFFSET, "12345", 1, List.of(3)),
+ arguments(StartEventPosition.LATEST, null, 1, List.of(3))
+ );
+ }
+
+ private Stream<Integer> extractEventIds(final MockFlowFile flowFile) {
+ final JsonValue json = Json.parse(flowFile.getContent());
+ return json.asArray().values().stream()
+ .map(JsonValue::asObject)
+ .map(jsonObject -> jsonObject.get("id").asString())
+ .map(Integer::parseInt);
+ }
+
+ private static class TestEventStream {
+
+ private static final String NOW_POSITION = "now";
+
+ private final List<BoxEvent> events = new ArrayList<>();
+
+ void addEvent(final int eventId) {
+ final BoxEvent boxEvent = new BoxEvent(null, "{\"event_id\":
\"%d\"}".formatted(eventId));
+ events.add(boxEvent);
+ }
+
+ EventLog consume(final String position) {
+ if (NOW_POSITION.equals(position)) {
+ return createEmptyEventLog();
+ }
+
+ final int streamPosition = Integer.parseInt(position);
+ if (streamPosition > events.size()) {
+ // Real Box API returns the latest offset position, even if
streamPosition was greater.
+ return createEmptyEventLog();
+ }
+
+ final List<BoxEvent> consumedEvents =
events.subList(streamPosition, events.size());
+
+ return createEventLog(consumedEvents);
+ }
+
+ private EventLog createEmptyEventLog() {
+ return createEventLog(emptyList());
+ }
+
+ private EventLog createEventLog(final List<BoxEvent> consumedEvents) {
+ // EventLog is not designed for being extended. Thus, mocking it.
+ final EventLog eventLog = mock();
+
+
when(eventLog.getNextStreamPosition()).thenReturn(String.valueOf(events.size()));
+
lenient().when(eventLog.getSize()).thenReturn(consumedEvents.size());
+
lenient().when(eventLog.iterator()).thenReturn(consumedEvents.iterator());
+
+ return eventLog;
+ }
+ }
+}