nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002157224


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+
+            try {
+                if (conflictResolution != 
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = 
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, 
BlobType.BLOCK_BLOB, length);
 
-            long length = flowFile.getSize();
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, 
e.getErrorCode().toString());

Review Comment:
   When you put an attribute to the flowfile you need to use the returned 
flowfile instead of the original one.
   ```suggestion
                   flowFile = session.putAttribute(flowFile, 
ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,8 @@ public final class BlobAttributes {
     public static final String ATTR_NAME_LENGTH = "azure.length";
     public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
 
+    public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+    public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code 
reported during blob operation";
+
+    public static final String ATTR_NAME_IGNORED = "ignored";

Review Comment:
   I'd prefix this attribute with `azure.` to prevent attribute name collision 
with other processors.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+
+            try {
+                if (conflictResolution != 
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
+
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = 
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, 
BlobType.BLOCK_BLOB, length);
 
-            long length = flowFile.getSize();
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE, 
e.getErrorCode().toString());
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                final boolean alreadyExists = errorCode == 
BlobErrorCode.BLOB_ALREADY_EXISTS;
+                final boolean ignore = conflictResolution == 
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+                // If the blob already exists, we always add an attribute 
"ignored"; depending on the value of
+                // conflict resolution, this will be true or false.
+                if (alreadyExists) {
+                    session.putAttribute(flowFile, ATTR_NAME_IGNORED, 
String.valueOf(ignore));

Review Comment:
   The comment is not valid. When the resolution is `replace`, this attribute 
is not applied, though the blob can already exist. I'd only apply this 
attribute in case of `ignore` resolution and set the value depending on the 
presence of the file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to