nandorsoma commented on code in PR #6443: URL: https://github.com/apache/nifi/pull/6443#discussion_r984483359
########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictStrategy.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.storage; + +import org.apache.nifi.components.AllowableValue; + +public enum AzureStorageConflictStrategy { Review Comment: I know it's a bit long, but probably I'd rename this class to `AzureStorageConflictResolutionStrategy`. Later in a separate pr, we could presumably remove the verbose AzureStorage prefix since the package already classifies that. ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java: ########## @@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (createContainer && !containerClient.exists()) { containerClient.create(); } + BlobClient blobClient = containerClient.getBlobClient(blobName); + Relationship relationship = REL_SUCCESS; + + final BlobRequestConditions blobRequestConditions = new BlobRequestConditions(); - long length = flowFile.getSize(); + try { + if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) { + OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant( + Instant.ofEpochMilli(flowFile.getEntryDate()), + ZoneId.systemDefault() + ); + blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince); + } else if (conflictStrategy != AzureStorageConflictStrategy.OVERWRITE) { + blobRequestConditions.setIfNoneMatch("*"); + } - try (InputStream rawIn = session.read(flowFile); - BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) { - blobClient.upload(bufferedIn, length); + try (InputStream rawIn = session.read(flowFile)) { + final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn)); + blobParallelUploadOptions.setRequestConditions(blobRequestConditions); + blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE); + } + } catch (BlobStorageException e) { + if (conflictStrategy == AzureStorageConflictStrategy.FAIL) throw e; + final BlobErrorCode errorCode = e.getErrorCode(); + if (errorCode == BlobErrorCode.BLOB_ALREADY_EXISTS || errorCode == BlobErrorCode.CONDITION_NOT_MET) { + relationship = REL_SKIPPED; Review Comment: This feature is also implemented in the ADLS processors where the flowFile is routed to `REL_SUCCESS` when the file is skipped. I think it is better to follow the same logic between these processors. ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java: ########## @@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (createContainer && !containerClient.exists()) { containerClient.create(); } + BlobClient blobClient = containerClient.getBlobClient(blobName); + Relationship relationship = REL_SUCCESS; + + final BlobRequestConditions blobRequestConditions = new BlobRequestConditions(); - long length = flowFile.getSize(); + try { + if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) { + OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant( + Instant.ofEpochMilli(flowFile.getEntryDate()), Review Comment: Is there a use case when entryDate is useful? Almost 100% of the time, flowFile.entryDate will be newer than the modified date on azure. I'd imagine that `OVERWRITE_IF_SOURCE_NEWER` option would be useful when the use case is to synchronize files fetched by GetFile with azure. In that situation, the modified date of the original file could be used instead. All in all, I would instead rely on a flowFile attribute. ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-services-api/src/main/java/org/apache/nifi/services/azure/storage/AzureStorageConflictStrategy.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.storage; + +import org.apache.nifi.components.AllowableValue; + +public enum AzureStorageConflictStrategy { + FAIL("Fail", "Fail if the target blob already exists"), Review Comment: The same feature is implemented in the ADSL processors. If possible, please use the same Strategy names that already exist there. ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java: ########## @@ -121,19 +168,42 @@ public void onTrigger(final ProcessContext context, final ProcessSession session if (createContainer && !containerClient.exists()) { containerClient.create(); } + BlobClient blobClient = containerClient.getBlobClient(blobName); + Relationship relationship = REL_SUCCESS; + + final BlobRequestConditions blobRequestConditions = new BlobRequestConditions(); - long length = flowFile.getSize(); + try { + if (conflictStrategy == AzureStorageConflictStrategy.OVERWRITE_IF_SOURCE_NEWER) { + OffsetDateTime ifUnmodifiedSince = OffsetDateTime.ofInstant( + Instant.ofEpochMilli(flowFile.getEntryDate()), + ZoneId.systemDefault() + ); + blobRequestConditions.setIfUnmodifiedSince(ifUnmodifiedSince); + } else if (conflictStrategy != AzureStorageConflictStrategy.OVERWRITE) { + blobRequestConditions.setIfNoneMatch("*"); + } - try (InputStream rawIn = session.read(flowFile); - BufferedInputStream bufferedIn = new BufferedInputStream(rawIn)) { - blobClient.upload(bufferedIn, length); + try (InputStream rawIn = session.read(flowFile)) { + final BlobParallelUploadOptions blobParallelUploadOptions = new BlobParallelUploadOptions(toFluxByteBuffer(rawIn)); + blobParallelUploadOptions.setRequestConditions(blobRequestConditions); + blobClient.uploadWithResponse(blobParallelUploadOptions, null, Context.NONE); Review Comment: If `OVERWRITE_IF_SOURCE_NEWER` is not needed I'd consider using `upload()` with the `boolean overwrite` parameter. (See my summary comment) ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java: ########## @@ -91,14 +103,48 @@ public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { "will fail if the container does not exist.") .build(); + public static final PropertyDescriptor CONFLICT_STRATEGY = new PropertyDescriptor.Builder() + .name("conflict-strategy") Review Comment: I'd rename this property to `conflict-resolution-strategy`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
