nandorsoma commented on code in PR #6443:
URL: https://github.com/apache/nifi/pull/6443#discussion_r994887992
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
- BlobClient blobClient = containerClient.getBlobClient(blobName);
+ BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
Review Comment:
These attributes were applied after a successful upload. Do I understand
correctly that the intention now is to apply attributes to the failed FlowFile
that already have value without triggering upload? Is this a requirement for a
specific use case? I'm asking because I worry whether it could break an
existing flow. For example, what happens if a downstream processor relies on
the existence of one of these properties? What do you think, @exceptionfactory?
I'd still change this part slightly if the above is not an issue.
- I wouldn't assume the `MIME_TYPE`. I'd append it after a successful upload
and investigate why the presence of `ATTR_NAME_LANG` was needed in the failing
test you linked in one of your comments.
- Currently, the code is a bit verbose. Those attribute changes distract
attention from the primary function, the upload itself. Therefore I'd extract
this part to something like `applyCommonAttributes()` and lines 170-173 to
something like `applyUploadResultAttributes`. This is just an idea. If you know
a better naming, go for it.
Sorry if I didn't notice it on the first review. Can it happen that this
change came with the force push?
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
- BlobClient blobClient = containerClient.getBlobClient(blobName);
+ BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(ATTR_NAME_CONTAINER, containerName);
+ attributes.put(ATTR_NAME_BLOBNAME, blobName);
+ attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+ attributes.put(ATTR_NAME_LANG, null);
+ attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
long length = flowFile.getSize();
- try (InputStream rawIn = session.read(flowFile);
- BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn)) {
- blobClient.upload(bufferedIn, length);
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
+
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ attributes.put(ATTR_NAME_BLOBTYPE,
BlobType.BLOCK_BLOB.toString());
+ attributes.put(ATTR_NAME_ETAG, blob.getETag());
+ attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+ attributes.put(ATTR_NAME_TIMESTAMP,
String.valueOf(blob.getLastModified()));
+ }
+ } catch (BlobStorageException e) {
+ if (conflictResolution ==
AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+ throw e;
+ }
+ final BlobErrorCode errorCode = e.getErrorCode();
+ if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+ getLogger().warn("Blob already exists: remote blob not
modified. Transferring {} to success", flowFile);
+ attributes.putAll(createBlobAttributesMap(blobClient));
+ attributes.put(ATTR_NAME_ERROR_CODE,
e.getErrorCode().toString());
Review Comment:
I wonder whether it is misleading to apply `ATTR_NAME_ERROR_CODE` in an
expected situation. That we rely on an error code internally is irrelevant from
the user's point of view. Probably I'd instead use the "ignored" property.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
- BlobClient blobClient = containerClient.getBlobClient(blobName);
+ BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(ATTR_NAME_CONTAINER, containerName);
+ attributes.put(ATTR_NAME_BLOBNAME, blobName);
+ attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+ attributes.put(ATTR_NAME_LANG, null);
+ attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
long length = flowFile.getSize();
- try (InputStream rawIn = session.read(flowFile);
- BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn)) {
- blobClient.upload(bufferedIn, length);
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
+
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ attributes.put(ATTR_NAME_BLOBTYPE,
BlobType.BLOCK_BLOB.toString());
+ attributes.put(ATTR_NAME_ETAG, blob.getETag());
+ attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+ attributes.put(ATTR_NAME_TIMESTAMP,
String.valueOf(blob.getLastModified()));
+ }
+ } catch (BlobStorageException e) {
+ if (conflictResolution ==
AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+ throw e;
+ }
+ final BlobErrorCode errorCode = e.getErrorCode();
+ if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
+ getLogger().warn("Blob already exists: remote blob not
modified. Transferring {} to success", flowFile);
Review Comment:
Since this is an expected scenario, I wouldn't log with a warning log level.
I would instead use debug.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java:
##########
@@ -137,10 +139,15 @@ protected Map<String, String>
initCommonExpressionLanguageAttributes() {
return attributes;
}
- protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String
containerName, String blobName, int blobLength) {
+ protected void assertFlowFileBlobAttributes(MockFlowFile flowFile, String
containerName, String blobName, int blobLength) throws
UnsupportedEncodingException {
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_CONTAINER,
containerName);
flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_BLOBNAME,
blobName);
- flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
String.format("https://%s.blob.core.windows.net/%s/%s", getAccountName(),
containerName, blobName));
+ flowFile.assertAttributeEquals(BlobAttributes.ATTR_NAME_PRIMARY_URI,
+ String.format("https://%s.blob.core.windows.net/%s/%s",
getAccountName(), containerName, URLEncoder.encode(
+ blobName,
+ StandardCharsets.US_ASCII.name()
+ ).replace("+", "%20"))
Review Comment:
After using URLEncoder, this replacement seems to be unnecessary.
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java:
##########
@@ -121,18 +146,47 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
if (createContainer && !containerClient.exists()) {
containerClient.create();
}
- BlobClient blobClient = containerClient.getBlobClient(blobName);
+ BlobClient blobClient = containerClient.getBlobClient(blobName);
+ final BlobRequestConditions blobRequestConditions = new
BlobRequestConditions();
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(ATTR_NAME_CONTAINER, containerName);
+ attributes.put(ATTR_NAME_BLOBNAME, blobName);
+ attributes.put(ATTR_NAME_PRIMARY_URI, blobClient.getBlobUrl());
+ attributes.put(ATTR_NAME_LANG, null);
+ attributes.put(ATTR_NAME_MIME_TYPE, "application/octet-stream");
long length = flowFile.getSize();
- try (InputStream rawIn = session.read(flowFile);
- BufferedInputStream bufferedIn = new
BufferedInputStream(rawIn)) {
- blobClient.upload(bufferedIn, length);
+ try {
+ if (conflictResolution !=
AzureStorageConflictResolutionStrategy.REPLACE_RESOLUTION) {
+ blobRequestConditions.setIfNoneMatch("*");
+ }
+
+ try (InputStream rawIn = session.read(flowFile)) {
+ final BlobParallelUploadOptions blobParallelUploadOptions
= new BlobParallelUploadOptions(toFluxByteBuffer(rawIn));
+
blobParallelUploadOptions.setRequestConditions(blobRequestConditions);
+ Response<BlockBlobItem> response =
blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE);
+ BlockBlobItem blob = response.getValue();
+ attributes.put(ATTR_NAME_BLOBTYPE,
BlobType.BLOCK_BLOB.toString());
+ attributes.put(ATTR_NAME_ETAG, blob.getETag());
+ attributes.put(ATTR_NAME_LENGTH, String.valueOf(length));
+ attributes.put(ATTR_NAME_TIMESTAMP,
String.valueOf(blob.getLastModified()));
+ }
+ } catch (BlobStorageException e) {
+ if (conflictResolution ==
AzureStorageConflictResolutionStrategy.FAIL_RESOLUTION) {
+ throw e;
+ }
+ final BlobErrorCode errorCode = e.getErrorCode();
+ if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS) {
Review Comment:
I would add `&& conflictResolution == IGNORE_RESOLUTION` to this statement
to make the code more intuitive. It doesn't change the result but makes
understanding what is going on easier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]