nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r984483359


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictStrategy.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.components.AllowableValue;
+
+public enum AzureStorageConflictStrategy {

Review Comment:
   I know it's a bit long, but probably I'd rename this class to 
`AzureStorageConflictResolutionStrategy`. Later in a separate pr, we could 
presumably remove the verbose AzureStorage prefix since the package already 
classifies that.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == 
AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = 
OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    
blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != 
AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, 
null, Context.NONE);
+                }
+            } catch (BlobStorageException e) {
+                if (conflictStrategy == AzureStorageConflictStrategy.FAIL) 
throw e;
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS || 
errorCode == BlobErrorCode.CONDITION_NOT_MET) {
+                    relationship = REL_SKIPPED;

Review Comment:
   This feature is also implemented in the ADLS processors where the flowFile 
is routed to `REL_SUCCESS` when the file is skipped. I think it is better to 
follow the same logic between these processors.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == 
AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = 
OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),

Review Comment:
   Is there a use case when entryDate is useful? Almost 100% of the time, 
flowFile.entryDate will be newer than the modified date on azure. I'd imagine 
that `OVERWRITE_IF_SOURCE_NEWER` option would be useful when the use case is to 
synchronize files fetched by GetFile with azure. In that situation, the 
modified date of the original file could be used instead. All in all, I would 
instead rely on a flowFile attribute.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictStrategy.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.services.azure.storage;
+
+import org.apache.nifi.components.AllowableValue;
+
+public enum AzureStorageConflictStrategy {
+    FAIL("Fail", "Fail if the target blob already exists"),

Review Comment:
   The same feature is implemented in the ADSL processors. If possible, please 
use the same Strategy names that already exist there.



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == 
AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = 
OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    
blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != 
AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, 
null, Context.NONE);

Review Comment:
   If `OVERWRITE_IF_SOURCE_NEWER` is not needed I'd consider using `upload()` 
with the `boolean overwrite` parameter. (See my summary comment)



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -91,14 +103,48 @@ public class PutAzureBlobStorage_v12 extends 
AbstractAzureBlobProcessor_v12 {
                     "will fail if the container does not exist.")
             .build();
 
+    public static final PropertyDescriptor CONFLICT_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("conflict-strategy")

Review Comment:
   I'd rename this property to `conflict-resolution-strategy`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to