This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 08bc44715e NIFI-10789 Set FlowFile attributes for errors fetching from 
Azure Data Lake Storage
08bc44715e is described below

commit 08bc44715e88193a668504fe20c6d3379386855f
Author: Emilio Setiadarma <[email protected]>
AuthorDate: Wed Nov 9 19:15:43 2022 -0800

    NIFI-10789 Set FlowFile attributes for errors fetching from Azure Data Lake 
Storage
    
    This closes #6644
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../azure/storage/FetchAzureDataLakeStorage.java       | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index a8c41e836b..5e8ebeb618 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -21,10 +21,13 @@ import 
com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
 import com.azure.storage.file.datalake.DataLakeFileSystemClient;
 import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
 import com.azure.storage.file.datalake.models.DownloadRetryOptions;
 import com.azure.storage.file.datalake.models.FileRange;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -48,6 +51,11 @@ import java.util.concurrent.TimeUnit;
 @SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, 
ListAzureDataLakeStorage.class})
 @CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
 @InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+        @WritesAttribute(attribute = "azure.datalake.storage.statusCode", 
description = "The HTTP error code (if available) from the failed operation"),
+        @WritesAttribute(attribute = "azure.datalake.storage.errorCode", 
description = "The Azure Data Lake Storage moniker of the failed operation"),
+        @WritesAttribute(attribute = "azure.datalake.storage.errorMessage", 
description = "The Azure Data Lake Storage error message from the failed 
operation")
+})
 public class FetchAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcessor {
 
     public static final PropertyDescriptor RANGE_START = new 
PropertyDescriptor.Builder()
@@ -124,14 +132,24 @@ public class FetchAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProce
                 throw new ProcessException(FILE.getDisplayName() + " (" + 
fileName + ") points to a directory. Full path: " + fileClient.getFilePath());
             }
 
+
             flowFile = session.write(flowFile, os -> 
fileClient.readWithResponse(os, fileRange, retryOptions, null, false, null, 
Context.NONE));
             session.getProvenanceReporter().modifyContent(flowFile);
             session.transfer(flowFile, REL_SUCCESS);
 
             final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             session.getProvenanceReporter().fetch(flowFile, 
fileClient.getFileUrl(), transferMillis);
+        } catch (final DataLakeStorageException e) {
+            getLogger().error("Failure to fetch file from Azure Data Lake 
Storage", e);
+            flowFile = session.putAttribute(flowFile, 
"azure.datalake.storage.statusCode", String.valueOf(e.getStatusCode()));
+            flowFile = session.putAttribute(flowFile, 
"azure.datalake.storage.errorCode", e.getErrorCode());
+            flowFile = session.putAttribute(flowFile, 
"azure.datalake.storage.errorMessage", e.getMessage());
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
         } catch (Exception e) {
             getLogger().error("Failure to fetch file from Azure Data Lake 
Storage", e);
+            // other exception, no available statusCode or errorCode
+            flowFile = session.putAttribute(flowFile, 
"azure.datalake.storage.errorMessage", e.getMessage());
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
         }

Reply via email to