nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1007251404
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -16,9 +16,17 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.core.http.rest.Response;
Review Comment:
Won't be needed when other suggestions are accepted.
```suggestion
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ applyCommonAttributes(attributes, blobClient);
+ final boolean ignore = conflictResolution ==
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
- long length = flowFile.getSize();
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ long length = flowFile.getSize();
+ applyUploadResultAttributes(attributes, blob,
BlobType.BLOCK_BLOB, length);
Review Comment:
```suggestion
blobClient.uploadWithResponse(blobParallelUploadOptions,
null, Context.NONE);
applyBlobMetadata(attributes, blobClient);
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -144,4 +194,19 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
session.transfer(flowFile, REL_FAILURE);
}
}
+
+ private static void applyUploadResultAttributes(final Map<String, String>
attributes, final BlockBlobItem blob, final BlobType blobType, final long
length) {
+ attributes.put(ATTR_NAME_BLOBTYPE, blobType.toString());
+ attributes.put(ATTR_NAME_ETAG, blob.getETag());
+ attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+ attributes.put(ATTR_NAME_TIMESTAMP,
String.valueOf(blob.getLastModified()));
+ attributes.put(ATTR_NAME_LANG, null);
+ attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
+ }
+
+ private static void applyCommonAttributes(final Map<String, String>
attributes, final BlobClient blobClient) {
+ attributes.put(ATTR_NAME_CONTAINER, blobClient.getContainerName());
+ attributes.put(ATTR_NAME_BLOBNAME, blobClient.getBlobName());
+ attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+ }
Review Comment:
See my summary.
```suggestion
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String>
initCommonExpressionLanguageAttributes() {
return attributes;
}
- protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String
containerName, String blobName, int blobLength) {
+ protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile,
String containerName, String blobName) throws UnsupportedEncodingException {
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER,
containerName);
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME,
blobName);
- flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(),
containerName, blobName));
+ flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+ String.format("https://%s.blob.core.windows.net/%s/%s",
getAccountName(), containerName, URLEncoder.encode(
+ blobName,
+ StandardCharsets.US_ASCII.name()
+ ).replace("+", "%20"))
+ );
+ }
+
+ protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile,
int blobLength) throws UnsupportedEncodingException {
Review Comment:
```suggestion
protected void assertFlowFileResultBlobAttributes(MockFlowFile flowFile,
int blobLength) {
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -136,7 +139,30 @@ public void testPutBlobToExistingBlob() throws Exception {
runProcessor(BLOB_DATA);
- assertFailure(BLOB_DATA);
+ MockFlowFile flowFile = assertFailure(BLOB_DATA, "BlobAlreadyExists");
+ assertEquals(flowFile.getAttribute(ATTR_NAME_IGNORED), null);
+ }
+
+ @Test
+ public void testPutBlobToExistingBlobConflictStrategyIgnore() throws
Exception {
+ uploadBlob(BLOB_NAME, BLOB_DATA);
+ runner.setProperty(PutAzureBlobStorage_v12.CONFLICT_RESOLUTION,
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION.getValue());
+
+ runProcessor(BLOB_DATA);
+
+ MockFlowFile flowFile = assertIgnored(getContainerName(), BLOB_NAME);
+ assertEquals(flowFile.getAttribute(ATTR_NAME_ERROR_CODE),
BlobErrorCode.BLOB_ALREADY_EXISTS.toString());
Review Comment:
```suggestion
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -75,7 +88,8 @@
@WritesAttribute(attribute = ATTR_NAME_MIME_TYPE, description =
ATTR_DESCRIPTION_MIME_TYPE),
@WritesAttribute(attribute = ATTR_NAME_LANG, description =
ATTR_DESCRIPTION_LANG),
@WritesAttribute(attribute = ATTR_NAME_TIMESTAMP, description =
ATTR_DESCRIPTION_TIMESTAMP),
- @WritesAttribute(attribute = ATTR_NAME_LENGTH, description =
ATTR_DESCRIPTION_LENGTH)})
+ @WritesAttribute(attribute = ATTR_NAME_LENGTH, description =
ATTR_DESCRIPTION_LENGTH),
+ @WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description =
ATTR_DESCRIPTION_ERROR_CODE)})
Review Comment:
```suggestion
@WritesAttribute(attribute = ATTR_NAME_ERROR_CODE, description =
ATTR_DESCRIPTION_ERROR_CODE),
@WritesAttribute(attribute = ATTR_NAME_IGNORED, description =
ATTR_DESCRIPTION_IGNORED)})
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ applyCommonAttributes(attributes, blobClient);
Review Comment:
```suggestion
final Map<String, String> attributes = new HashMap<>();
applyStandardBlobAttributes(attributes, blobClient);
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -35,18 +43,21 @@
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import
org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy;
-import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static com.azure.core.util.FluxUtil.toFluxByteBuffer;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER;
+import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ERROR_CODE;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;
Review Comment:
```suggestion
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_ETAG;
import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_IGNORED;
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ applyCommonAttributes(attributes, blobClient);
+ final boolean ignore = conflictResolution ==
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
- long length = flowFile.getSize();
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ long length = flowFile.getSize();
+ applyUploadResultAttributes(attributes, blob,
BlobType.BLOCK_BLOB, length);
+ if (ignore) {
+ attributes.put(ATTR_NAME_IGNORED, "false");
+ }
+ }
+ } catch (BlobStorageException e) {
+ final BlobErrorCode errorCode = e.getErrorCode();
+ flowFile = session.putAttribute(flowFile,
ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
Review Comment:
We don't have to apply error code in case of ignore resolution.
```suggestion
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,8 @@ public final class BlobAttributes {
public static final String ATTR_NAME_LENGTH = "azure.length";
public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
+ public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+ public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code
reported during blob operation";
+
+ public static final String ATTR_NAME_IGNORED = "azure.ignored";
Review Comment:
Description is missing.
```suggestion
public static final String ATTR_NAME_IGNORED = "azure.ignored";
public static final String ATTR_DESCRIPTION_IGNORED = "When Conflict
Resolution Strategy is 'ignore', " +
"this property will be true/false depending on whether the blob
was ignored.";
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -16,9 +16,17 @@
*/
package org.apache.nifi.processors.azure.storage;
+import com.azure.core.http.rest.Response;
+import com.azure.core.util.Context;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.models.BlobErrorCode;
+import com.azure.storage.blob.models.BlobRequestConditions;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.models.BlobType;
+import com.azure.storage.blob.models.BlockBlobItem;
Review Comment:
Won't be needed when other suggestions are accepted.
```suggestion
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
Review Comment:
```suggestion
final BlobClient blobClient =
containerClient.getBlobClient(blobName);
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,42 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ applyCommonAttributes(attributes, blobClient);
+ final boolean ignore = conflictResolution ==
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
- long length = flowFile.getSize();
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ long length = flowFile.getSize();
+ applyUploadResultAttributes(attributes, blob,
BlobType.BLOCK_BLOB, length);
+ if (ignore) {
+ attributes.put(ATTR_NAME_IGNORED, "false");
+ }
+ }
+ } catch (BlobStorageException e) {
+ final BlobErrorCode errorCode = e.getErrorCode();
+ flowFile = session.putAttribute(flowFile,
ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
- try (InputStream rawIn = session.read(flowFile);
- BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn)) {
- blobClient.upload(bufferedIn, length);
+ if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS && ignore) {
+ getLogger().info("Blob already exists: remote blob not
modified. Transferring {} to success", flowFile);
+ attributes.put(ATTR_NAME_IGNORED, "true");
+ } else {
+ throw e;
Review Comment:
We only need to apply an error code when we cannot recover.
```suggestion
throw e;
flowFile = session.putAttribute(flowFile,
ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java:
##########
@@ -164,7 +164,7 @@ protected Map<String, String>
createBlobAttributesMap(BlobClient blobClient) {
Map<String, String> attributes = new HashMap<>();
BlobProperties properties = blobClient.getProperties();
- String primaryUri = String.format("%s/%s",
blobClient.getContainerClient().getBlobContainerUrl(),
blobClient.getBlobName());
+ String primaryUri = blobClient.getBlobUrl();
Review Comment:
Because of this change, some tests in `ITFetchAzureBlobStorage_v12` are
failing. I don't create a suggestion here because my summary comment would
overwrite it.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -34,6 +34,8 @@
import org.junit.jupiter.api.BeforeEach;
import java.io.ByteArrayInputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
Review Comment:
Won't be needed when other suggestions are accepted.
```suggestion
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String>
initCommonExpressionLanguageAttributes() {
return attributes;
}
- protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String
containerName, String blobName, int blobLength) {
+ protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile,
String containerName, String blobName) throws UnsupportedEncodingException {
Review Comment:
```suggestion
protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile,
String containerName, String blobName) {
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,18 @@ protected Map<String, String>
initCommonExpressionLanguageAttributes() {
return attributes;
}
- protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String
containerName, String blobName, int blobLength) {
+ protected void assertFlowFileCommonBlobAttributes(MockFlowFile flowFile,
String containerName, String blobName) throws UnsupportedEncodingException {
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER,
containerName);
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME,
blobName);
- flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(),
containerName, blobName));
+ flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+ String.format("https://%s.blob.core.windows.net/%s/%s",
getAccountName(), containerName, URLEncoder.encode(
+ blobName,
+ StandardCharsets.US_ASCII.name()
+ ).replace("+", "%20"))
+ );
Review Comment:
```suggestion
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(),
containerName, blobName));
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -18,10 +18,12 @@
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.models.BlobErrorCode;
Review Comment:
Won't be needed when other suggestions are accepted.
```suggestion
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage_v12.java:
##########
@@ -32,11 +34,12 @@
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_ERROR_CODE;
Review Comment:
Won't be needed when other suggestions are accepted.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]