This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 08bc44715e NIFI-10789 Set FlowFile attributes for errors fetching from
Azure Data Lake Storage
08bc44715e is described below
commit 08bc44715e88193a668504fe20c6d3379386855f
Author: Emilio Setiadarma <[email protected]>
AuthorDate: Wed Nov 9 19:15:43 2022 -0800
NIFI-10789 Set FlowFile attributes for errors fetching from Azure Data Lake
Storage
This closes #6644
Signed-off-by: David Handermann <[email protected]>
---
.../azure/storage/FetchAzureDataLakeStorage.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
index a8c41e836b..5e8ebeb618 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java
@@ -21,10 +21,13 @@ import
com.azure.storage.file.datalake.DataLakeDirectoryClient;
import com.azure.storage.file.datalake.DataLakeFileClient;
import com.azure.storage.file.datalake.DataLakeFileSystemClient;
import com.azure.storage.file.datalake.DataLakeServiceClient;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
import com.azure.storage.file.datalake.models.DownloadRetryOptions;
import com.azure.storage.file.datalake.models.FileRange;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@@ -48,6 +51,11 @@ import java.util.concurrent.TimeUnit;
@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class,
ListAzureDataLakeStorage.class})
@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage")
@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+ @WritesAttribute(attribute = "azure.datalake.storage.statusCode",
description = "The HTTP error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "azure.datalake.storage.errorCode",
description = "The Azure Data Lake Storage moniker of the failed operation"),
+ @WritesAttribute(attribute = "azure.datalake.storage.errorMessage",
description = "The Azure Data Lake Storage error message from the failed
operation")
+})
public class FetchAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProcessor {
public static final PropertyDescriptor RANGE_START = new
PropertyDescriptor.Builder()
@@ -124,14 +132,24 @@ public class FetchAzureDataLakeStorage extends
AbstractAzureDataLakeStorageProce
throw new ProcessException(FILE.getDisplayName() + " (" +
fileName + ") points to a directory. Full path: " + fileClient.getFilePath());
}
+
flowFile = session.write(flowFile, os ->
fileClient.readWithResponse(os, fileRange, retryOptions, null, false, null,
Context.NONE));
session.getProvenanceReporter().modifyContent(flowFile);
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
session.getProvenanceReporter().fetch(flowFile,
fileClient.getFileUrl(), transferMillis);
+ } catch (final DataLakeStorageException e) {
+ getLogger().error("Failure to fetch file from Azure Data Lake
Storage", e);
+ flowFile = session.putAttribute(flowFile,
"azure.datalake.storage.statusCode", String.valueOf(e.getStatusCode()));
+ flowFile = session.putAttribute(flowFile,
"azure.datalake.storage.errorCode", e.getErrorCode());
+ flowFile = session.putAttribute(flowFile,
"azure.datalake.storage.errorMessage", e.getMessage());
+ flowFile = session.penalize(flowFile);
+ session.transfer(flowFile, REL_FAILURE);
} catch (Exception e) {
getLogger().error("Failure to fetch file from Azure Data Lake
Storage", e);
+ // other exception, no available statusCode or errorCode
+ flowFile = session.putAttribute(flowFile,
"azure.datalake.storage.errorMessage", e.getMessage());
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
}