This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ae2b734853 NIFI-14969 - Add Content MD5 checksum validation for
PutAzureBlobStorage_v12
ae2b734853 is described below
commit ae2b7348537537702fa50e04f937eb2c0f57cb2d
Author: Pierre Villard <[email protected]>
AuthorDate: Thu Jan 8 12:50:09 2026 +0100
NIFI-14969 - Add Content MD5 checksum validation for PutAzureBlobStorage_v12
This closes #10746.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../azure/storage/PutAzureBlobStorage_v12.java | 12 ++++++++
.../azure/storage/utils/AzureStorageUtils.java | 34 ++++++++++++++++++++++
2 files changed, 46 insertions(+)
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
index a53c2a71c6..50c0a4d9d9 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java
@@ -22,6 +22,7 @@ import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.BlobType;
@@ -62,6 +63,8 @@ import java.util.concurrent.TimeUnit;
import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM;
import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.CONTENT_MD5;
+import static
org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.convertMd5ToBytes;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
@@ -112,6 +115,7 @@ public class PutAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 impl
AzureStorageUtils.CREATE_CONTAINER,
AzureStorageUtils.CONFLICT_RESOLUTION,
BLOB_NAME,
+ CONTENT_MD5,
RESOURCE_TRANSFER_SOURCE,
FILE_RESOURCE_SERVICE,
AzureStorageUtils.PROXY_CONFIGURATION_SERVICE,
@@ -182,6 +186,14 @@ public class PutAzureBlobStorage_v12 extends
AbstractAzureBlobProcessor_v12 impl
final ParallelTransferOptions parallelTransferOptions =
new ParallelTransferOptions().setBlockSizeLong(blockSize);
blobParallelUploadOptions.setParallelTransferOptions(parallelTransferOptions);
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+
+ final String contentMd5 =
context.getProperty(CONTENT_MD5).evaluateAttributeExpressions(sourceFlowFile).getValue();
+ if (contentMd5 != null) {
+ final byte[] md5Bytes = convertMd5ToBytes(contentMd5);
+ final BlobHttpHeaders blobHttpHeaders = new
BlobHttpHeaders().setContentMd5(md5Bytes);
+ blobParallelUploadOptions.setHeaders(blobHttpHeaders);
+ }
+
Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
BlockBlobItem blob = response.getValue();
applyUploadResultAttributes(attributes, blob,
BlobType.BLOCK_BLOB, transferSize);
diff --git
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
index 26030d3075..be2d6f1fed 100644
---
a/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
+++
b/nifi-extension-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java
@@ -38,8 +38,10 @@ import reactor.netty.http.client.HttpClient;
import java.net.InetSocketAddress;
import java.net.Proxy;
+import java.util.Base64;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.HexFormat;
import java.util.Map;
import static
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME;
@@ -192,6 +194,21 @@ public final class AzureStorageUtils {
.description("Specifies whether an existing blob will have its
contents replaced upon conflict.")
.build();
+ public static final PropertyDescriptor CONTENT_MD5 = new
PropertyDescriptor.Builder()
+ .name("Content MD5")
+ .displayName("Content MD5")
+ .description("""
+ The MD5 hash of the content. When this property is set,
Azure will validate
+ the uploaded content against this checksum and reject the
upload if it doesn't match. This provides
+ data integrity verification during transfer. The value can
be provided in hexadecimal format (32 characters)
+ or Base64 format. The MD5 checksum must be computed before
invoking this processor;
+ use the CryptographicHashContent processor with algorithm
MD5 to store the result as a FlowFile attribute,
+ then reference it using Expression Language (e.g.,
${content_MD5}).""")
+ .required(false)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
public static final String SAS_TOKEN_BASE_DESCRIPTION = "Shared Access
Signature token (the leading '?' may be included)";
public static final String SAS_TOKEN_SECURITY_DESCRIPTION =
@@ -354,6 +371,23 @@ public final class AzureStorageUtils {
}
}
+ /**
+ * Converts an MD5 checksum string to bytes. Accepts both hexadecimal
format (as output by CryptographicHashContent)
+ * and Base64 format.
+ *
+ * @param md5String the MD5 checksum as hex (32 chars) or Base64 (24 chars
with padding)
+ * @return the MD5 as a 16-byte array
+ */
+ public static byte[] convertMd5ToBytes(final String md5String) {
+ // MD5 in hex format is 32 characters (128 bits = 16 bytes, 2 hex
chars per byte)
+ if (md5String.length() == 32 && md5String.matches("[0-9a-fA-F]+")) {
+ return HexFormat.of().parseHex(md5String);
+ } else {
+ // Assume Base64 format
+ return Base64.getDecoder().decode(md5String);
+ }
+ }
+
public static class DirectoryValidator implements Validator {
private String displayName;