nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r1002157224
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ applyCommonAttributes(attributes, blobClient);
+
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
+
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ long length = flowFile.getSize();
+ applyUploadResultAttributes(attributes, blob,
BlobType.BLOCK_BLOB, length);
- long length = flowFile.getSize();
+ }
+ } catch (BlobStorageException e) {
+ final BlobErrorCode errorCode = e.getErrorCode();
+ session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE,
e.getErrorCode().toString());
Review Comment:
When you put an attribute to the flowfile you need to use the returned
flowfile instead of the original one.
```suggestion
flowFile = session.putAttribute(flowFile,
ATTR_NAME_ERROR_CODE, e.getErrorCode().toString());
```
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobAttributes.java:
##########
@@ -45,4 +45,8 @@ public final class BlobAttributes {
public static final String ATTR_NAME_LENGTH = "azure.length";
public static final String ATTR_DESCRIPTION_LENGTH = "Length of the blob";
+ public static final String ATTR_NAME_ERROR_CODE = "azure.error.code";
+ public static final String ATTR_DESCRIPTION_ERROR_CODE = "Error code
reported during blob operation";
+
+ public static final String ATTR_NAME_IGNORED = "ignored";
Review Comment:
I'd prefix this attribute with `azure.` to prevent attribute name collision
with other processors.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +147,47 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
+
BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ applyCommonAttributes(attributes, blobClient);
+
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
+
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ long length = flowFile.getSize();
+ applyUploadResultAttributes(attributes, blob,
BlobType.BLOCK_BLOB, length);
- long length = flowFile.getSize();
+ }
+ } catch (BlobStorageException e) {
+ final BlobErrorCode errorCode = e.getErrorCode();
+ session.putAttribute(flowFile, ATTR_NAME_ERROR_CODE,
e.getErrorCode().toString());
- try (InputStream rawIn = session.read(flowFile);
- BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn)) {
- blobClient.upload(bufferedIn, length);
+ final boolean alreadyExists = errorCode ==
BlobErrorCode.BLOB_ALREADY_EXISTS;
+ final boolean ignore = conflictResolution ==
AzureStorageConflictResolutionStrategy.IGNORE_RESOLUTION;
+
+ // If the blob already exists, we always add an attribute
"ignored"; depending on the value of
+ // conflict resolution, this will be true or false.
+ if (alreadyExists) {
+ session.putAttribute(flowFile, ATTR_NAME_IGNORED,
String.valueOf(ignore));
Review Comment:
The comment is not valid. When the resolution is `replace`, this attribute
is not applied, though the blob can already exist. I'd only apply this
attribute in case of `ignore` resolution and set the value depending on the
presence of the file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]