nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1007251404


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -16,9 +16,17 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.http.rest.Response;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == 
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != 
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = 
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, 
BlobType.BLOCK_BLOB, length);

Review Comment:
   ```suggestion
                       blobClient.uploadWithResponse(blobParallelUploadOptions, 
null, Context.NONE);
                       applyBlobMetadata(attributes, blobClient);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -144,4 +194,19 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             session.transfer(flowFile, REL_FAILURE);
         }
     }
+
+    private static void applyUploadResultAttributes(final Map<String, String> 
attributes, final BlockBlobItem blob, final BlobType blobType, final long 
length) {
+        attributes.put(ATTR_NAME_BLOBTYPE, blobType.toString());
+        attributes.put(ATTR_NAME_ETAG, blob.getETag());
+        attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+        attributes.put(ATTR_NAME_TIMESTAMP, 
String.valueOf(blob.getLastModified()));
+        attributes.put(ATTR_NAME_LANG, null);
+        attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
+    }
+
+    private static void applyCommonAttributes(final Map<String, String> 
attributes, final BlobClient blobClient) {
+        attributes.put(ATTR_NAME_CONTAINER, blobClient.getContainerName());
+        attributes.put(ATTR_NAME_BLOBNAME, blobClient.getBlobName());
+        attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+    }

Review Comment:
   See my summary.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String> 
initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String 
containerName, String blobName, int blobLength) {
+    protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, 
String containerName, String blobName) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, 
containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, 
blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, 
String.format("https://%s.blob.core.windows.net/%s/%s";, getAccountName(), 
containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s";, 
getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20"))
+        );
+    }
+
+    protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile, 
int blobLength) throws UnsupportedEncodingException {

Review Comment:
   ```suggestion
       protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile, 
int blobLength) {
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -136,7 +139,30 @@ public void testPutBlobToExistingBlob() throws Exception {
 
         runProcessor(BLOB_DATA);
 
-        assertFailure(BLOB_DATA);
+        MockFlowFile flowFile = assertFailure(BLOB_DATA, "BlobAlreadyExists");
+        assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), null);
+    }
+
+    @Test
+    public void testPutBlobToExistingBlobConflictStrategyIgnore() throws 
Exception {
+        uploadBlob(BLOB_NAME, BLOB_DATA);
+        runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION, 
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION.getValue());
+
+        runProcessor(BLOB_DATA);
+
+        MockFlowFile flowFile = assertIgnored(getContainerName(), BLOB_NAME);
+        assertEquals(flowFile.getAttribute(ATTR_NAME_ERROR_CODE), 
BlobErrorCode.BLOB_ALREADY_EXISTS.toString());

Review Comment:
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -75,7 +88,8 @@
         @WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description = 
ATTR_DESCRIPTION_MIME_TYPE),
         @WritesAttribute(attribute = ATTR_NAME_LANG, description = 
ATTR_DESCRIPTION_LANG),
         @WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description = 
ATTR_DESCRIPTION_TIMESTAMP),
-        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = 
ATTR_DESCRIPTION_LENGTH)})
+        @WritesAttribute(attribute = ATTR_NAME_LENGTH, description = 
ATTR_DESCRIPTION_LENGTH),
+        @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = 
ATTR_DESCRIPTION_ERROR_CODE)})

Review Comment:
   ```suggestion
           @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description = 
ATTR_DESCRIPTION_ERROR_CODE),
           @WritesAttribute(attribute = ATTR_NAME_IGNORED, description = 
ATTR_DESCRIPTION_IGNORED)})
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);

Review Comment:
   ```suggestion
               final Map<String, String> attributes = new HashMap<>();
               applyStandardBlobAttributes(attributes, blobClient);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -35,18 +43,21 @@
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import 
org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
 
-import java.io.BufferedInputStream;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
+import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ERROR_CODE;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;

Review Comment:
   ```suggestion
   import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;
   import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_IGNORED;
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == 
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != 
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = 
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, 
BlobType.BLOCK_BLOB, length);
+                    if (ignore) {
+                        attributes.put(ATTR_NAME_IGNORED, "false");
+                    }
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                flowFile = session.putAttribute(flowFile, 
ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());

Review Comment:
   We don't have to apply error code in case of ignore resolution.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,8 @@ public final class BlobAttributes {
     public static final String ATTR_NAME_LENGTH = "azure.length";
     public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
 
+    public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+    public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code 
reported during blob operation";
+
+    public static final String ATTR_NAME_IGNORED = "azure.ignored";

Review Comment:
   Description is missing.
   ```suggestion
       public static final String ATTR_NAME_IGNORED = "azure.ignored";
       public static final String ATTR_DESCRIPTION_IGNORED = "When Conflict 
Resolution Strategy is 'ignore', " +
               "this property will be true/false depending on whether the blob 
was ignored.";
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -16,9 +16,17 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
+import com.azure.core.http.rest.Response;
+import com.azure.core.util.Context;
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobRequestConditions;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.BlobType;
+import com.azure.storage.blob.models.BlockBlobItem;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);

Review Comment:
   ```suggestion
               final BlobClient blobClient = 
containerClient.getBlobClient(blobName);
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
+            Map<String, String> attributes = new HashMap<>();
+            applyCommonAttributes(attributes, blobClient);
+            final boolean ignore = conflictResolution == 
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+            try {
+                if (conflictResolution != 
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            long length = flowFile.getSize();
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    Response<BlockBlobItem> response = 
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+                    BlockBlobItem blob = response.getValue();
+                    long length = flowFile.getSize();
+                    applyUploadResultAttributes(attributes, blob, 
BlobType.BLOCK_BLOB, length);
+                    if (ignore) {
+                        attributes.put(ATTR_NAME_IGNORED, "false");
+                    }
+                }
+            } catch (BlobStorageException e) {
+                final BlobErrorCode errorCode = e.getErrorCode();
+                flowFile = session.putAttribute(flowFile, 
ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS && ignore) {
+                    getLogger().info("Blob already exists: remote blob not 
modified. Transferring {} to success", flowFile);
+                    attributes.put(ATTR_NAME_IGNORED, "true");
+                } else {
+                    throw e;

Review Comment:
   We only need to apply an error code when we cannot recover.
   ```suggestion
                       throw e;
                       flowFile = session.putAttribute(flowFile, 
ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java:
##########
@@ -164,7 +164,7 @@ protected Map<String, String> 
createBlobAttributesMap(BlobClient blobClient) {
         Map<String, String> attributes = new HashMap<>();
 
         BlobProperties properties = blobClient.getProperties();
-        String primaryUri = String.format("%s/%s", 
blobClient.getContainerClient().getBlobContainerUrl(), 
blobClient.getBlobName());
+        String primaryUri = blobClient.getBlobUrl();

Review Comment:
   Because of this change, some tests in `ITFetchAzureBlobStorage_v12` are 
failing. I don't create a suggestion here because my summary comment would 
overwrite it.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -34,6 +34,8 @@
 import org.junit.jupiter.api.BeforeEach;
 
 import java.io.ByteArrayInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String> 
initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String 
containerName, String blobName, int blobLength) {
+    protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, 
String containerName, String blobName) throws UnsupportedEncodingException {

Review Comment:
   ```suggestion
       protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, 
String containerName, String blobName) {
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String> 
initCommonExpressionLanguageAttributes() {
         return attributes;
     }
 
-    protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String 
containerName, String blobName, int blobLength) {
+    protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile, 
String containerName, String blobName) throws UnsupportedEncodingException {
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER, 
containerName);
         flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME, 
blobName);
-        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, 
String.format("https://%s.blob.core.windows.net/%s/%s";, getAccountName(), 
containerName, blobName));
+        flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+                String.format("https://%s.blob.core.windows.net/%s/%s";, 
getAccountName(), containerName, URLEncoder.encode(
+                        blobName,
+                        StandardCharsets.US_ASCII.name()
+                ).replace("+", "%20"))
+        );

Review Comment:
   ```suggestion
           flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI, 
String.format("https://%s.blob.core.windows.net/%s/%s";, getAccountName(), 
containerName, blobName));
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -18,10 +18,12 @@
 
 import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobErrorCode;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -32,11 +34,12 @@
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;

Review Comment:
   Won't be needed when other suggestions are accepted.
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to