malthe commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r985732544


##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == 
AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = 
OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    
blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != 
AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, 
null, Context.NONE);
+                }
+            } catch (BlobStorageException e) {
+                if (conflictStrategy == AzureStorageConflictStrategy.FAIL) 
throw e;
+                final BlobErrorCode errorCode = e.getErrorCode();
+                if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS || 
errorCode == BlobErrorCode.CONDITION_NOT_MET) {
+                    relationship = REL_SKIPPED;

Review Comment:
   I have done this now – except instead of a "skipped" attribute or such, I 
have introduced a new "azure.error" attribute which then has the value of 
"BlobAlreadyExists" (the official error code string) if ... the blob already 
existed (and if that is an error).



##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             if (createContainer && !containerClient.exists()) {
                 containerClient.create();
             }
+
             BlobClient blobClient = containerClient.getBlobClient(blobName);
+            Relationship relationship = REL_SUCCESS;
+
+            final BlobRequestConditions blobRequestConditions = new 
BlobRequestConditions();
 
-            long length = flowFile.getSize();
+            try {
+                if (conflictStrategy == 
AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) {
+                    OffsetDateTime ifUnmodifiedSince = 
OffsetDateTime.ofInstant(
+                            Instant.ofEpochMilli(flowFile.getEntryDate()),
+                            ZoneId.systemDefault()
+                    );
+                    
blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince);
+                } else if (conflictStrategy != 
AzureStorageConflictStrategy.OVERWRITE) {
+                    blobRequestConditions.setIfNoneMatch("*");
+                }
 
-            try (InputStream rawIn = session.read(flowFile);
-                 BufferedInputStream bufferedIn = new 
BufferedInputStream(rawIn)) {
-                blobClient.upload(bufferedIn, length);
+                try (InputStream rawIn = session.read(flowFile)) {
+                    final BlobParallelUploadOptions blobParallelUploadOptions 
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+                    
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+                    blobClient.uploadWithResponse(blobParallelUploadOptions, 
null, Context.NONE);

Review Comment:
   I have decided to put this mode back into the drawer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to