This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 626e828943 NIFI-14322: Creating processor to fetch Box file using a
representation
626e828943 is described below
commit 626e828943caeca52478d1fd7a42d44411e4ea85
Author: Noah Cover <[email protected]>
AuthorDate: Tue Mar 4 14:00:27 2025 -0800
NIFI-14322: Creating processor to fetch Box file using a representation
Signed-off-by: Pierre Villard <[email protected]>
This closes #9769.
---
.../processors/box/FetchBoxFileRepresentation.java | 237 +++++++++++++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../box/FetchBoxFileRepresentationTest.java | 193 +++++++++++++++++
3 files changed, 431 insertions(+)
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/FetchBoxFileRepresentation.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/FetchBoxFileRepresentation.java
new file mode 100644
index 0000000000..b0d8c1acae
--- /dev/null
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/java/org/apache/nifi/processors/box/FetchBoxFileRepresentation.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxAPIConnection;
+import com.box.sdk.BoxAPIException;
+import com.box.sdk.BoxAPIResponseException;
+import com.box.sdk.BoxFile;
+import com.box.sdk.BoxFile.Info;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.box.controllerservices.BoxClientService;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.nifi.components.ConfigVerificationResult.Outcome;
+
+@Tags({"box", "cloud", "storage", "file", "representation", "content",
"download"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Fetches a Box file representation using a
representation hint and writes it to the FlowFile content.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "box.id", description = "The ID of the
Box file."),
+ @WritesAttribute(attribute = "box.file.name", description = "The name
of the Box file."),
+ @WritesAttribute(attribute = "box.file.size", description = "The size
of the Box file in bytes."),
+ @WritesAttribute(attribute = "box.file.created.time", description =
"The timestamp when the file was created."),
+ @WritesAttribute(attribute = "box.file.modified.time", description =
"The timestamp when the file was last modified."),
+ @WritesAttribute(attribute = "box.file.mime.type", description = "The
MIME type of the file."),
+ @WritesAttribute(attribute = "box.file.representation.type",
description = "The representation type that was fetched."),
+ @WritesAttribute(attribute = "box.error.message", description = "The
error message returned by Box if the operation fails."),
+ @WritesAttribute(attribute = "box.error.code", description = "The
error code returned by Box if the operation fails.")
+})
+@SeeAlso({FetchBoxFile.class, ListBoxFile.class})
+public class FetchBoxFileRepresentation extends AbstractProcessor implements
VerifiableProcessor {
+
+ private static final String BOX_FILE_URI =
"https://api.box.com/2.0/files/%s/content?representation=%s";
+
+ static final PropertyDescriptor FILE_ID = new Builder()
+ .name("File ID")
+ .defaultValue("${box.id}")
+ .description("The ID of the Box file to retrieve.")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor REPRESENTATION_TYPE = new Builder()
+ .name("Representation Type")
+ .description("The type of representation to fetch. Common values
include 'pdf', 'text', 'jpg', 'png', etc.")
+ .required(true)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles that are successfully processed will be
routed to this relationship.")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that encounter errors during processing
will be routed to this relationship.")
+ .build();
+
+ static final Relationship REL_FILE_NOT_FOUND = new Relationship.Builder()
+ .name("file.not.found")
+ .description("FlowFiles for which the specified Box file was not
found.")
+ .build();
+
+ static final Relationship REL_REPRESENTATION_NOT_FOUND = new
Relationship.Builder()
+ .name("representation.not.found")
+ .description("FlowFiles for which the specified Box file's
requested representation was not found.")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS =
List.of(
+ BoxClientService.BOX_CLIENT_SERVICE,
+ FILE_ID,
+ REPRESENTATION_TYPE
+ );
+
+ private static final Set<Relationship> RELATIONSHIPS = Set.of(
+ REL_SUCCESS,
+ REL_FAILURE,
+ REL_FILE_NOT_FOUND,
+ REL_REPRESENTATION_NOT_FOUND
+ );
+
+ private volatile BoxAPIConnection boxAPIConnection;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTY_DESCRIPTORS;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ final BoxClientService boxClientService =
context.getProperty(BoxClientService.BOX_CLIENT_SERVICE)
+ .asControllerService(BoxClientService.class);
+ boxAPIConnection = boxClientService.getBoxApiConnection();
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final ComponentLog logger = getLogger();
+ final String fileId =
context.getProperty(FILE_ID).evaluateAttributeExpressions(flowFile).getValue();
+ final String representationType =
context.getProperty(REPRESENTATION_TYPE).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ final BoxFile boxFile = getBoxFile(fileId);
+ final Info fileInfo = boxFile.getInfo();
+
+ flowFile = session.write(flowFile, outputStream ->
+ // Download the file representation, box sdk handles a
request to create representation if it doesn't exist
+ boxFile.getRepresentationContent("[" + representationType
+ "]", outputStream)
+ );
+
+ flowFile = session.putAllAttributes(flowFile, Map.of(
+ "box.id", fileId,
+ "box.file.name", fileInfo.getName(),
+ "box.file.size", String.valueOf(fileInfo.getSize()),
+ "box.file.created.time",
fileInfo.getCreatedAt().toString(),
+ "box.file.modified.time",
fileInfo.getModifiedAt().toString(),
+ "box.file.mime.type", fileInfo.getType(),
+ "box.file.representation.type", representationType
+ ));
+
+ session.getProvenanceReporter().fetch(flowFile,
BOX_FILE_URI.formatted(fileId, representationType));
+ session.transfer(flowFile, REL_SUCCESS);
+ } catch (final BoxAPIResponseException e) {
+ flowFile = session.putAttribute(flowFile, "box.error.message",
e.getMessage());
+ flowFile = session.putAttribute(flowFile, "box.error.code",
String.valueOf(e.getResponseCode()));
+
+ if (e.getResponseCode() == 404) {
+ logger.warn("Box file with ID {} was not found or
representation {} is not available", fileId, representationType);
+ session.transfer(flowFile, REL_FILE_NOT_FOUND);
+ } else {
+ logger.error("Failed to retrieve Box file representation for
file [{}]", fileId, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ } catch (final BoxAPIException e) {
+ flowFile = session.putAttribute(flowFile, "box.error.message",
e.getMessage());
+ flowFile = session.putAttribute(flowFile, "box.error.code",
String.valueOf(e.getResponseCode()));
+
+ // Check if this is the "No matching representations found" error
+ if (e.getMessage() != null &&
e.getMessage().toLowerCase().startsWith("no matching representations found for
requested")) {
+ logger.warn("Representation {} is not available for file {}:
{}", representationType, fileId, e.getMessage());
+ session.transfer(flowFile, REL_REPRESENTATION_NOT_FOUND);
+ } else {
+ logger.error("BoxAPIException while retrieving file [{}]",
fileId, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
+ }
+ }
+
+ /**
+ * Get BoxFile object for the given fileId. Required for testing purposes
to mock BoxFile.
+ *
+ * @param fileId fileId
+ * @return BoxFile object
+ */
+ protected BoxFile getBoxFile(final String fileId) {
+ return new BoxFile(boxAPIConnection, fileId);
+ }
+
+ @Override
+ public List<ConfigVerificationResult> verify(final ProcessContext context,
+ final ComponentLog
verificationLogger,
+ final Map<String, String>
attributes) {
+ final List<ConfigVerificationResult> results = new ArrayList<>();
+ final BoxClientService boxClientService =
context.getProperty(BoxClientService.BOX_CLIENT_SERVICE)
+ .asControllerService(BoxClientService.class);
+ final BoxAPIConnection boxAPIConnection =
boxClientService.getBoxApiConnection();
+
+ try {
+ boxAPIConnection.refresh();
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Box API Connection")
+ .outcome(Outcome.SUCCESSFUL)
+ .explanation("Successfully validated Box connection")
+ .build());
+ } catch (final Exception e) {
+ verificationLogger.warn("Failed to verify configuration", e);
+ results.add(new ConfigVerificationResult.Builder()
+ .verificationStepName("Box API Connection")
+ .outcome(Outcome.FAILED)
+ .explanation(String.format("Failed to validate Box
connection: %s", e.getMessage()))
+ .build());
+ }
+
+ return results;
+ }
+}
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 2a4163074e..23954f2a85 100644
---
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,5 +15,6 @@
org.apache.nifi.processors.box.ListBoxFile
org.apache.nifi.processors.box.FetchBoxFile
org.apache.nifi.processors.box.FetchBoxFileInfo
+org.apache.nifi.processors.box.FetchBoxFileRepresentation
org.apache.nifi.processors.box.PutBoxFile
org.apache.nifi.processors.box.ConsumeBoxEvents
diff --git
a/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/FetchBoxFileRepresentationTest.java
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/FetchBoxFileRepresentationTest.java
new file mode 100644
index 0000000000..fb8b47a1fc
--- /dev/null
+++
b/nifi-extension-bundles/nifi-box-bundle/nifi-box-processors/src/test/java/org/apache/nifi/processors/box/FetchBoxFileRepresentationTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.box;
+
+import com.box.sdk.BoxAPIException;
+import com.box.sdk.BoxAPIResponseException;
+import com.box.sdk.BoxFile;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.OutputStream;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class FetchBoxFileRepresentationTest extends AbstractBoxFileTest {
+ private static final String TEST_FILE_ID = "238490238429";
+ private static final String TEST_REPRESENTATION_TYPE = "pdf";
+ private static final String TEST_FILE_NAME = "testfile.txt";
+ private static final long TEST_FILE_SIZE = 1024L;
+ private static final Date TEST_CREATED_TIME = new Date(1643673600000L); //
2022-02-01
+ private static final Date TEST_MODIFIED_TIME = new Date(1643760000000L);
// 2022-02-02
+ private static final String TEST_FILE_TYPE = "file";
+ private static final byte[] TEST_CONTENT = "test content".getBytes();
+
+ @Mock
+ private BoxFile mockBoxFile;
+
+ @Mock
+ private BoxFile.Info mockFileInfo;
+
+ @Override
+ @BeforeEach
+ void setUp() throws Exception {
+ when(mockBoxFile.getInfo()).thenReturn(mockFileInfo);
+
+ final FetchBoxFileRepresentation testProcessor = new
FetchBoxFileRepresentation() {
+ @Override
+ protected BoxFile getBoxFile(final String fileId) {
+ return mockBoxFile;
+ }
+ };
+
+ testRunner = TestRunners.newTestRunner(testProcessor);
+ testRunner.setProperty(FetchBoxFileRepresentation.FILE_ID,
TEST_FILE_ID);
+ testRunner.setProperty(FetchBoxFileRepresentation.REPRESENTATION_TYPE,
TEST_REPRESENTATION_TYPE);
+ super.setUp();
+ }
+
+ @Test
+ void testSuccessfulFetch() throws Exception {
+ when(mockFileInfo.getName()).thenReturn(TEST_FILE_NAME);
+ when(mockFileInfo.getSize()).thenReturn(TEST_FILE_SIZE);
+ when(mockFileInfo.getCreatedAt()).thenReturn(TEST_CREATED_TIME);
+ when(mockFileInfo.getModifiedAt()).thenReturn(TEST_MODIFIED_TIME);
+ when(mockFileInfo.getType()).thenReturn(TEST_FILE_TYPE);
+
+ doAnswer(invocation -> {
+ final OutputStream outputStream = invocation.getArgument(1);
+ outputStream.write(TEST_CONTENT);
+ return null;
+ }).when(mockBoxFile).getRepresentationContent(eq("[" +
TEST_REPRESENTATION_TYPE + "]"), any(OutputStream.class));
+
+ testRunner.enqueue("");
+ testRunner.run();
+
testRunner.assertAllFlowFilesTransferred(FetchBoxFileRepresentation.REL_SUCCESS,
1);
+
+ final List<MockFlowFile> successFiles =
testRunner.getFlowFilesForRelationship(FetchBoxFileRepresentation.REL_SUCCESS);
+ final MockFlowFile resultFile = successFiles.getFirst();
+ resultFile.assertContentEquals(TEST_CONTENT);
+
+ resultFile.assertAttributeEquals("box.id", TEST_FILE_ID);
+ resultFile.assertAttributeEquals("box.file.name", TEST_FILE_NAME);
+ resultFile.assertAttributeEquals("box.file.size",
String.valueOf(TEST_FILE_SIZE));
+ resultFile.assertAttributeEquals("box.file.created.time",
TEST_CREATED_TIME.toString());
+ resultFile.assertAttributeEquals("box.file.modified.time",
TEST_MODIFIED_TIME.toString());
+ resultFile.assertAttributeEquals("box.file.mime.type", TEST_FILE_TYPE);
+ resultFile.assertAttributeEquals("box.file.representation.type",
TEST_REPRESENTATION_TYPE);
+
+ final List<ProvenanceEventRecord> provenanceEvents =
testRunner.getProvenanceEvents();
+ assertEquals(1, provenanceEvents.size());
+ assertEquals(ProvenanceEventType.FETCH,
provenanceEvents.getFirst().getEventType());
+ }
+
+ @Test
+ void testFileNotFound() {
+ // Create 404 exception
+ final BoxAPIResponseException notFoundException =
mock(BoxAPIResponseException.class);
+ when(notFoundException.getResponseCode()).thenReturn(404);
+ when(notFoundException.getMessage()).thenReturn("File not found");
+
+ when(mockBoxFile.getInfo()).thenThrow(notFoundException);
+
+ testRunner.enqueue("");
+ testRunner.run();
+
testRunner.assertAllFlowFilesTransferred(FetchBoxFileRepresentation.REL_FILE_NOT_FOUND,
1);
+
+ final MockFlowFile resultFile =
testRunner.getFlowFilesForRelationship(FetchBoxFileRepresentation.REL_FILE_NOT_FOUND).getFirst();
+ resultFile.assertAttributeEquals("box.error.message", "File not
found");
+ resultFile.assertAttributeEquals("box.error.code", "404");
+ }
+
+ @Test
+ void testRepresentationNotFound() {
+ // Have getRepresentationContent throw a BoxAPIException with
representation not found error
+ final BoxAPIException repNotFoundException =
mock(BoxAPIException.class);
+ when(repNotFoundException.getMessage()).thenReturn("No matching
representations found for requested hint");
+ when(repNotFoundException.getResponseCode()).thenReturn(400);
+
+
doThrow(repNotFoundException).when(mockBoxFile).getRepresentationContent(anyString(),
any(OutputStream.class));
+
+ testRunner.enqueue("");
+ testRunner.run();
+
testRunner.assertAllFlowFilesTransferred(FetchBoxFileRepresentation.REL_REPRESENTATION_NOT_FOUND,
1);
+
+ final MockFlowFile resultFile =
testRunner.getFlowFilesForRelationship(FetchBoxFileRepresentation.REL_REPRESENTATION_NOT_FOUND).getFirst();
+ resultFile.assertAttributeEquals("box.error.message", "No matching
representations found for requested hint");
+ resultFile.assertAttributeEquals("box.error.code", "400");
+ }
+
+ @Test
+ void testGeneralApiError() {
+ // Have getRepresentationContent throw a BoxAPIException with a
general error
+ final BoxAPIException generalException = mock(BoxAPIException.class);
+ when(generalException.getMessage()).thenReturn("API error occurred");
+ when(generalException.getResponseCode()).thenReturn(500);
+
+
doThrow(generalException).when(mockBoxFile).getRepresentationContent(anyString(),
any(OutputStream.class));
+
+ testRunner.enqueue("");
+ testRunner.run();
+
testRunner.assertAllFlowFilesTransferred(FetchBoxFileRepresentation.REL_FAILURE,
1);
+
+ final MockFlowFile resultFile =
testRunner.getFlowFilesForRelationship(FetchBoxFileRepresentation.REL_FAILURE).getFirst();
+ resultFile.assertAttributeEquals("box.error.message", "API error
occurred");
+ resultFile.assertAttributeEquals("box.error.code", "500");
+ }
+
+ @Test
+ void testFileIdFromFlowFileAttributes() {
+ when(mockFileInfo.getName()).thenReturn(TEST_FILE_NAME);
+ when(mockFileInfo.getSize()).thenReturn(TEST_FILE_SIZE);
+ when(mockFileInfo.getCreatedAt()).thenReturn(TEST_CREATED_TIME);
+ when(mockFileInfo.getModifiedAt()).thenReturn(TEST_MODIFIED_TIME);
+ when(mockFileInfo.getType()).thenReturn(TEST_FILE_TYPE);
+ testRunner.setProperty(FetchBoxFileRepresentation.FILE_ID,
"${box.id}");
+
+ doAnswer(invocation -> {
+ final OutputStream outputStream = invocation.getArgument(1);
+ outputStream.write(TEST_CONTENT);
+ return null;
+ }).when(mockBoxFile).getRepresentationContent(anyString(),
any(OutputStream.class));
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("box.id", TEST_FILE_ID);
+ testRunner.enqueue("", attributes);
+ testRunner.run();
+
+
testRunner.assertAllFlowFilesTransferred(FetchBoxFileRepresentation.REL_SUCCESS,
1);
+ }
+}