This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-3.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit b57a75d0ee349017442cc3e9fe7f95ed9a591bf7 Author: Jussi Wallin <[email protected]> AuthorDate: Mon Dec 12 18:41:59 2022 +0200 [Camel-18808] add support for CamelAzureStorageBlobBlobUploadSize in uploadBlockBlob operation (#8881) * CAMEL-18808: add BLOB_SIZE support to uploadBlockBlob operation This commit adds support for the CamelAzureStorageBlobBlobSize-header in the producers uploadBlockBlob -operation. This is necessary to allow a user to upload blobs from streams that do not support mark and reset, e.g. a FileInputStream. * CAMEL-18808: separate the consumer and producer blob size headers Separate the two headers to mitigate a propagation of the header from a consumer to a subsequent producer. Also remove the header at the end of the uploadBlockBlob -operation to avoid affecting any following uploads. * CAMEL_18808: fix formatting * CAMEL-18808: move header removal to more intuitive spot --- .../azure/storage/blob/azure-storage-blob.json | 1 + .../azure/storage/blob/BlobConstants.java | 7 ++++- .../azure/storage/blob/BlobStreamAndLength.java | 7 +++-- .../storage/blob/client/BlobClientWrapper.java | 10 +++++-- .../storage/blob/integration/BlobOperationsIT.java | 33 ++++++++++++++++++++++ 5 files changed, 53 insertions(+), 5 deletions(-) diff --git a/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json b/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json index 6d296819229..9ebf681a396 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json +++ b/components/camel-azure/camel-azure-storage-blob/src/generated/resources/org/apache/camel/component/azure/storage/blob/azure-storage-blob.json @@ -68,6 +68,7 @@ "CamelAzureStorageBlobContentLanguage": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Content language specified for the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#CONTENT_LANGUAGE" }, "CamelAzureStorageBlobCacheControl": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Cache control specified for the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#CACHE_CONTROL" }, "CamelAzureStorageBlobBlobSize": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The size of the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_SIZE" }, + "CamelAzureStorageBlobBlobUploadSize": { "kind": "header", "displayName": "", "group": "producer", "label": "producer", "required": false, "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "When uploading a blob with the uploadBlockBlob-operation this can be used to tell the client what the length of an InputStream is.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_UPLOAD_SIZE" }, "CamelAzureStorageBlobSequenceNumber": { "kind": "header", "displayName": "", "group": "common", "label": "", "required": false, "javaType": "Long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "(producer) (createPageBlob) A user-controlled value that you can use to track requests. The value of the sequence number must be between 0 and 263 - 1. The default value is 0. (consumer) The current sequence number for a page blob.", "constan [...] "CamelAzureStorageBlobBlobType": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "org.apache.camel.component.azure.storage.blob.BlobType", "enum": [ "blockblob", "appendblob", "pageblob" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "The type of the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#BLOB_TYPE" }, "CamelAzureStorageBlobLeaseStatus": { "kind": "header", "displayName": "", "group": "consumer", "label": "consumer", "required": false, "javaType": "com.azure.storage.blob.models.LeaseStatusType", "enum": [ "locked", "unlocked" ], "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "description": "Status of the lease on the blob.", "constantName": "org.apache.camel.component.azure.storage.blob.BlobConstants#LEASE_STATUS" }, diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java index ec068d8bd35..282b077232f 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobConstants.java @@ -57,8 +57,13 @@ public final class BlobConstants { public static final String CONTENT_LANGUAGE = HEADER_PREFIX + "ContentLanguage"; @Metadata(label = "consumer", description = "Cache control specified for the blob.", javaType = "String") public static final String CACHE_CONTROL = HEADER_PREFIX + "CacheControl"; - @Metadata(label = "consumer", description = "The size of the blob.", javaType = "long") + @Metadata(label = "consumer", description = "The size of the blob.", + javaType = "long") public static final String BLOB_SIZE = HEADER_PREFIX + "BlobSize"; + @Metadata(label = "producer", + description = "When uploading a blob with the uploadBlockBlob-operation this can be used to tell the client what the length of an InputStream is.", + javaType = "long") + public static final String BLOB_UPLOAD_SIZE = HEADER_PREFIX + "BlobUploadSize"; @Metadata(description = "(producer) (createPageBlob) A user-controlled value that you can use to track requests. " + "The value of the sequence number must be between 0 and 2^63 - 1. The default value is 0.\n" + "(consumer) The current sequence number for a page blob.", diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java index a38a6040799..34bd2e3d688 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/BlobStreamAndLength.java @@ -40,6 +40,8 @@ public final class BlobStreamAndLength { @SuppressWarnings("rawtypes") public static BlobStreamAndLength createBlobStreamAndLengthFromExchangeBody(final Exchange exchange) throws IOException { Object body = exchange.getIn().getBody(); + Long blobSize = exchange.getIn().getHeader(BlobConstants.BLOB_UPLOAD_SIZE, () -> null, Long.class); + exchange.getIn().removeHeader(BlobConstants.BLOB_UPLOAD_SIZE); // remove to avoid issues for further uploads if (body instanceof WrappedFile) { // unwrap file @@ -47,7 +49,8 @@ public final class BlobStreamAndLength { } if (body instanceof InputStream) { - return new BlobStreamAndLength((InputStream) body, BlobUtils.getInputStreamLength((InputStream) body)); + return new BlobStreamAndLength( + (InputStream) body, blobSize != null ? blobSize : BlobUtils.getInputStreamLength((InputStream) body)); } if (body instanceof File) { return new BlobStreamAndLength(new BufferedInputStream(new FileInputStream((File) body)), ((File) body).length()); @@ -65,7 +68,7 @@ public final class BlobStreamAndLength { throw new IllegalArgumentException("Unsupported blob type:" + body.getClass().getName()); } - return new BlobStreamAndLength(inputStream, BlobUtils.getInputStreamLength(inputStream)); + return new BlobStreamAndLength(inputStream, blobSize != null ? blobSize : BlobUtils.getInputStreamLength(inputStream)); } public InputStream getInputStream() { diff --git a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java index b99cff0bd14..f020b542fdf 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java +++ b/components/camel-azure/camel-azure-storage-blob/src/main/java/org/apache/camel/component/azure/storage/blob/client/BlobClientWrapper.java @@ -18,6 +18,7 @@ package org.apache.camel.component.azure.storage.blob.client; import java.io.InputStream; import java.io.OutputStream; +import java.nio.ByteBuffer; import java.time.Duration; import java.time.OffsetDateTime; import java.util.HashMap; @@ -52,6 +53,7 @@ import com.azure.storage.blob.models.PageBlobRequestConditions; import com.azure.storage.blob.models.PageRange; import com.azure.storage.blob.models.PageRangeItem; import com.azure.storage.blob.models.ParallelTransferOptions; +import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions; import com.azure.storage.blob.options.ListPageRangesOptions; import com.azure.storage.blob.sas.BlobSasPermission; import com.azure.storage.blob.sas.BlobServiceSasSignatureValues; @@ -60,7 +62,9 @@ import com.azure.storage.blob.specialized.BlobInputStream; import com.azure.storage.blob.specialized.BlockBlobClient; import com.azure.storage.blob.specialized.PageBlobClient; import com.azure.storage.common.StorageSharedKeyCredential; +import com.azure.storage.common.Utility; import org.apache.camel.util.ObjectHelper; +import reactor.core.publisher.Flux; public class BlobClientWrapper { private static final String SERVICE_URI_SEGMENT = ".blob.core.windows.net"; @@ -120,8 +124,10 @@ public class BlobClientWrapper { final Map<String, String> metadata, AccessTier tier, final byte[] contentMd5, final BlobRequestConditions requestConditions, final Duration timeout) { - return getBlockBlobClient().uploadWithResponse(data, length, headers, metadata, tier, contentMd5, requestConditions, - timeout, Context.NONE); + Flux<ByteBuffer> dataBuffer = Utility.convertStreamToByteBuffer(data, length, 4194304, false); + BlockBlobSimpleUploadOptions uploadOptions = new BlockBlobSimpleUploadOptions(dataBuffer, length).setHeaders(headers) + .setMetadata(metadata).setTier(tier).setContentMd5(contentMd5).setRequestConditions(requestConditions); + return getBlockBlobClient().uploadWithResponse(uploadOptions, timeout, Context.NONE); } public HttpHeaders stageBlockBlob( diff --git a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java index 88d5d9a99f7..6fe837e8f86 100644 --- a/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java +++ b/components/camel-azure/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/integration/BlobOperationsIT.java @@ -19,6 +19,7 @@ package org.apache.camel.component.azure.storage.blob.integration; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -59,6 +60,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; class BlobOperationsIT extends Base { @@ -229,6 +231,37 @@ class BlobOperationsIT extends Base { blobClientWrapper.delete(null, null, null); } + @Test + void testUploadBlockBlobAsStreamWithBlobSizeHeader() throws Exception { + final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper("upload_test_file"); + final BlobOperations operations = new BlobOperations(configuration, blobClientWrapper); + + final File fileToUpload + = new File(Objects.requireNonNull(getClass().getClassLoader().getResource("upload_test_file")).getFile()); + final Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody(new FileInputStream(fileToUpload)); + exchange.getIn().setHeader(BlobConstants.BLOB_UPLOAD_SIZE, fileToUpload.length()); + + final BlobOperationResponse response = operations.uploadBlockBlob(exchange); + + assertNotNull(response); + assertTrue((boolean) response.getBody()); + // check for eTag and md5 to make sure is uploaded + assertNotNull(response.getHeaders().get(BlobConstants.E_TAG)); + assertNotNull(response.getHeaders().get(BlobConstants.CONTENT_MD5)); + + // check that the size header got removed + assertNull(exchange.getIn().getHeader(BlobConstants.BLOB_UPLOAD_SIZE)); + + // check content + final BlobOperationResponse getBlobResponse = operations.getBlob(null); + + assertEquals("awesome camel to upload!", + IOUtils.toString((InputStream) getBlobResponse.getBody(), Charset.defaultCharset())); + + blobClientWrapper.delete(null, null, null); + } + @Test void testCommitAndStageBlockBlob() throws Exception { final BlobClientWrapper blobClientWrapper = blobContainerClientWrapper.getBlobClientWrapper("upload_test_file");
