malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002231096
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ applyCommonAttributes(attributes, blobClient);
+
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
+
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ long length = flowFile.getSize();
+ applyUploadResultAttributes(attributes, blob,
BlobType.BLOCK_BLOB, length);
- long length = flowFile.getSize();
+ }
+ } catch (BlobStorageException e) {
+ final BlobErrorCode errorCode = e.getErrorCode();
+ session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE,
e.getErrorCode().toString());
- try (InputStream rawIn = session.read(flowFile);
- BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn)) {
- blobClient.upload(bufferedIn, length);
+ final boolean alreadyExists = errorCode ==
BlobErrorCode.BLOB_ALREADY_EXISTS;
+ final boolean ignore = conflictResolution ==
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+ // If the blob already exists, we always add an attribute
"ignored"; depending on the value of
+ // conflict resolution, this will be true or false.
+ if (alreadyExists) {
+ session.putAttribute(flowFile, ATTR_NAME_IGNORED,
String.valueOf(ignore));
Review Comment:
Good point. I have changed this in 4a822e15df25ad5eb24ddf02d54a55799213a003
now.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,8 @@ public final class BlobAttributes {
public static final String ATTR_NAME_LENGTH = "azure.length";
public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
+ public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+ public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code
reported during blob operation";
+
+ public static final String ATTR_NAME_IGNORED = "ignored";
Review Comment:
Changed in ddb9b0cb4394c015e3ad89bf71a2d3b369b06ca4.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]