This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ae2b734853 NIFI-14969 - Add Content MD5 checksum validation for 
PutAzureBlobStorage_v12
ae2b734853 is described below

commit ae2b7348537537702fa50e04f937eb2c0f57cb2d
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Jan 8 12:50:09 2026 +0100

    NIFI-14969 - Add Content MD5 checksum validation for PutAzureBlobStorage_v12
    
    This closes #10746.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../azure/storage/PutAzureBlobStorage_v12.java     | 12 ++++++++
 .../azure/storage/utils/AzureStorageUtils.java     | 34 ++++++++++++++++++++++
 2 files changed, 46 insertions(+)

diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index a53c2a71c6..50c0a4d9d9 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -22,6 +22,7 @@ import com.azure.storage.blob.BlobClient;
 import com.azure.storage.blob.BlobContainerClient;
 import com.azure.storage.blob.BlobServiceClient;
 import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobHttpHeaders;
 import com.azure.storage.blob.models.BlobRequestConditions;
 import com.azure.storage.blob.models.BlobStorageException;
 import com.azure.storage.blob.models.BlobType;
@@ -62,6 +63,8 @@ import java.util.concurrent.TimeUnit;
 import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM;
 import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
 import static 
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
+import static 
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.CONTENT_MD5;
+import static 
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.convertMd5ToBytes;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
 import static 
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@@ -112,6 +115,7 @@ public class PutAzureBlobStorage_v12 extends 
AbstractAzureBlobProcessor_v12 impl
             AzureStorageUtils.CREATE_CONTAINER,
             AzureStorageUtils.CONFLICT_RESOLUTION,
             BLOB_NAME,
+            CONTENT_MD5,
             RESOURCE_TRANSFER_SOURCE,
             FILE_RESOURCE_SERVICE,
             AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
@@ -182,6 +186,14 @@ public class PutAzureBlobStorage_v12 extends 
AbstractAzureBlobProcessor_v12 impl
                     final ParallelTransferOptions parallelTransferOptions = 
new ParallelTransferOptions().setBlockSizeLong(blockSize);
                     
blobParallelUploadOptions.setParallelTransferOptions(parallelTransferOptions);
                     
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+
+                    final String contentMd5 = 
context.getProperty(CONTENT_MD5).evaluateAttributeExpressions(sourceFlowFile).getValue();
+                    if (contentMd5 != null) {
+                        final byte[] md5Bytes = convertMd5ToBytes(contentMd5);
+                        final BlobHttpHeaders blobHttpHeaders = new 
BlobHttpHeaders().setContentMd5(md5Bytes);
+                        blobParallelUploadOptions.setHeaders(blobHttpHeaders);
+                    }
+
                     Response<BlockBlobItem> response = 
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
                     BlockBlobItem blob = response.getValue();
                     applyUploadResultAttributes(attributes, blob, 
BlobType.BLOCK_BLOB, transferSize);
diff --git 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 26030d3075..be2d6f1fed 100644
--- 
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++ 
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -38,8 +38,10 @@ import reactor.netty.http.client.HttpClient;
 
 import java.net.InetSocketAddress;
 import java.net.Proxy;
+import java.util.Base64;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HexFormat;
 import java.util.Map;
 
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
@@ -192,6 +194,21 @@ public final class AzureStorageUtils {
             .description("Specifies whether an existing blob will have its 
contents replaced upon conflict.")
             .build();
 
+    public static final PropertyDescriptor CONTENT_MD5 = new 
PropertyDescriptor.Builder()
+            .name("Content MD5")
+            .displayName("Content MD5")
+            .description("""
+                    The MD5 hash of the content. When this property is set, 
Azure will validate
+                    the uploaded content against this checksum and reject the 
upload if it doesn't match. This provides
+                    data integrity verification during transfer. The value can 
be provided in hexadecimal format (32 characters)
+                    or Base64 format. The MD5 checksum must be computed before 
invoking this processor;
+                    use the CryptographicHashContent processor with algorithm 
MD5 to store the result as a FlowFile attribute,
+                    then reference it using Expression Language (e.g., 
${content_MD5}).""")
+            .required(false)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     public static final String SAS_TOKEN_BASE_DESCRIPTION = "Shared Access 
Signature token (the leading '?' may be included)";
 
     public static final String SAS_TOKEN_SECURITY_DESCRIPTION =
@@ -354,6 +371,23 @@ public final class AzureStorageUtils {
         }
     }
 
+    /**
+     * Converts an MD5 checksum string to bytes. Accepts both hexadecimal 
format (as output by CryptographicHashContent)
+     * and Base64 format.
+     *
+     * @param md5String the MD5 checksum as hex (32 chars) or Base64 (24 chars 
with padding)
+     * @return the MD5 as a 16-byte array
+     */
+    public static byte[] convertMd5ToBytes(final String md5String) {
+        // MD5 in hex format is 32 characters (128 bits = 16 bytes, 2 hex 
chars per byte)
+        if (md5String.length() == 32 && md5String.matches("[0-9a-fA-F]+")) {
+            return HexFormat.of().parseHex(md5String);
+        } else {
+            // Assume Base64 format
+            return Base64.getDecoder().decode(md5String);
+        }
+    }
+
     public static class DirectoryValidator implements Validator {
         private String displayName;
 

Reply via email to