Lehel44 commented on code in PR #8540: URL: https://github.com/apache/nifi/pull/8540#discussion_r1535030163
########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import org.apache.nifi.components.DescribedValue; + +public enum WritingStrategy implements DescribedValue { + + WRITE_AND_RENAME("Write and Rename", "The processor writes the Azure file into a temporary directory and then renames/moves it to the final destination." + + " This prevents other processes from reading partially written files."), + SIMPLE_WRITE("Simple Write", "The processor writes the Azure file directly to the destination. This might cause reading partially written files."); Review Comment: ```suggestion SIMPLE_WRITE("Simple Write", "The processor writes the Azure file directly to the destination. This might result in the reading of partially written files."); ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java: ########## @@ -99,6 +100,15 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION) .build(); + protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder() + .name("writing-strategy") + .displayName("Writing Strategy") Review Comment: Should the new convention be followed here? ```suggestion .name("Writing Strategy") ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java: ########## @@ -53,6 +54,7 @@ protected String getDefaultEndpointSuffix() { protected void setUpCredentials() throws Exception { ADLSCredentialsService service = new ADLSCredentialsControllerService(); runner.addControllerService("ADLSCredentials", service); + runner.setProperty(service, AzureStorageUtils.CREDENTIALS_TYPE, AzureStorageCredentialsType.ACCOUNT_KEY); Review Comment: What is the purpose of this change? ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java: ########## @@ -229,38 +259,65 @@ static void uploadContent(DataLakeFileClient fileClient, InputStream in, long le } /** - * This method serves as a "commit" for the upload process. Upon upload, a 0-byte file is created, then the payload is appended to it. - * Because of that, a work-in-progress file is available for readers before the upload is complete. It is not an efficient approach in - * case of conflicts because FlowFiles are uploaded unnecessarily, but it is a calculated risk because consistency is more important. + * Creates the file on Azure for 'Simple Write' strategy. Upon upload, a 0-byte file is created, then the payload is appended to it. + * Because of that, a work-in-progress file is available for readers before the upload is complete. + * + * @param directoryClient directory client of the uploaded file's parent directory + * @param fileName name of the uploaded file + * @param conflictResolution conflict resolution strategy + * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' + * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors + */ + DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) { + final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName); + + try { + final boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION); + return directoryClient.createFile(fileName, overwrite); + } catch (DataLakeStorageException dataLakeStorageException) { + return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution); + } + } + + /** + * This method serves as a "commit" for the upload process in case of 'Write and Rename' strategy. In order to prevent work-in-progress files from being available for readers, + * a temporary file is written first, and then renamed/moved to its final destination. It is not an efficient approach in case of conflicts because FlowFiles are uploaded unnecessarily, + * but it is a calculated risk because consistency is more important for 'Write and Rename' strategy. * <p> * Visible for testing * - * @param sourceFileClient client of the temporary file + * @param sourceFileClient file client of the temporary file * @param destinationDirectory final location of the uploaded file * @param destinationFileName final name of the uploaded file * @param conflictResolution conflict resolution strategy - * @return URL of the uploaded file + * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' + * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors */ - String renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { + DataLakeFileClient renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { final String destinationPath = createPath(destinationDirectory, destinationFileName); try { final DataLakeRequestConditions destinationCondition = new DataLakeRequestConditions(); if (!conflictResolution.equals(REPLACE_RESOLUTION)) { destinationCondition.setIfNoneMatch("*"); } - return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue().getFileUrl(); + return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue(); } catch (DataLakeStorageException dataLakeStorageException) { - removeTempFile(sourceFileClient); - if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { - getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", - destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); - return null; - } else if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) { - throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException); - } else { - throw new ProcessException(String.format("Renaming File [%s] failed", destinationPath), dataLakeStorageException); - } + removeFile(sourceFileClient); + + return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution); + } + } + + private DataLakeFileClient handleDataLakeStorageException(final DataLakeStorageException dataLakeStorageException, final String destinationPath, final String conflictResolution) { + if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { Review Comment: Note: Another option is to create the condition based on the error code as there are built in constants for it which might be more descriptive. It can also be extracted to a local variable for reuse. ```suggestion if (BlobErrorCode.BLOB_ALREADY_EXISTS.equals(dataLakeStorageException.getErrorCode()) && conflictResolution.equals(IGNORE_RESOLUTION)) { ``` ########## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html: ########## @@ -26,31 +26,57 @@ This processor is responsible for uploading files to Azure Data Lake Storage Gen2. </p> -<h3>File uploading and cleanup process</h3> +<h3>File uploading and cleanup process in case of "Write and Rename" strategy</h3> <h4>New file upload</h4> <ol> <li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li> <li>Content is appended to temp file.</li> - <li>Temp file is renamed to its original name, the original file is overwritten.</li> - <li>In case of appending or renaming failure the temp file is deleted, the original file remains intact.</li> - <li>In case of temporary file deletion failure both temp file and original file remain on the server.</li> + <li>Temp file is moved to the final destination directory and renamed to its original name.</li> + <li>In case of appending or renaming failure, the temp file is deleted.</li> + <li>In case of temporary file deletion failure, the temp file remains on the server.</li> </ol> <h4>Existing file upload</h4> <ul> - <li>Processors with "fail" conflict resolution strategy will be directed to "Failure" relationship.</li> - <li>Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.</li> + <li>Processors with "fail" conflict resolution strategy will direct the FlowFile to "Failure" relationship.</li> + <li>Processors with "ignore" conflict resolution strategy will direct the FlowFile to "Success" relationship.</li> <li>Processors with "replace" conflict resolution strategy:</li> <ol> <li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li> <li>Content is appended to temp file.</li> - <li>Temp file is renamed to its original name, the original file is overwritten.</li> - <li>In case of appending or renaming failure the temp file is deleted, the original file remains intact.</li> - <li>In case of temporary file deletion failure both temp file and original file remain on the server.</li> + <li>Temp file is moved to the final destination directory and renamed to its original name, the original file is overwritten.</li> + <li>In case of appending or renaming failure, the temp file is deleted, the original file remains intact.</li> + <li>In case of temporary file deletion failure, both temp file and original file remain on the server.</li> + </ol> +</ul> + +<h3>File uploading and cleanup process in case of "Simple Write" strategy</h3> + +<h4>New file upload</h4> + +<ol> + <li>An empty file is created at its final destination.</li> + <li>Content is appended to the file.</li> + <li>In case of appending failure, the file is deleted.</li> + <li>In case of file deletion failure, the file remains on the server.</li> +</ol> + +<h4>Existing file upload</h4> + +<ul> + <li>Processors with "fail" conflict resolution strategy will direct the FlowFile to "Failure" relationship.</li> + <li>Processors with "ignore" conflict resolution strategy will direct the FlowFile to "Success" relationship.</li> + <li>Processors with "replace" conflict resolution strategy:</li> + + <ol> + <li>An empty file is created at its final destination, the original file is overwritten.</li> + <li>Content is appended to the file.</li> + <li>In case of appending failure, the file is deleted, the original file is not be restored.</li> Review Comment: ```suggestion <li>In case of appending failure, the file is deleted and the original file is not restored.</li> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
