turcsanyip commented on code in PR #8540:
URL: https://github.com/apache/nifi/pull/8540#discussion_r1535144271
##########
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java:
##########
@@ -229,38 +259,65 @@ static void uploadContent(DataLakeFileClient fileClient,
InputStream in, long le
}
/**
- * This method serves as a "commit" for the upload process. Upon upload, a
0-byte file is created, then the payload is appended to it.
- * Because of that, a work-in-progress file is available for readers
before the upload is complete. It is not an efficient approach in
- * case of conflicts because FlowFiles are uploaded unnecessarily, but it
is a calculated risk because consistency is more important.
+ * Creates the file on Azure for 'Simple Write' strategy. Upon upload, a
0-byte file is created, then the payload is appended to it.
+ * Because of that, a work-in-progress file is available for readers
before the upload is complete.
+ *
+ * @param directoryClient directory client of the uploaded file's parent
directory
+ * @param fileName name of the uploaded file
+ * @param conflictResolution conflict resolution strategy
+ * @return the file client of the uploaded file or {@code null} if the
file already exists and conflict resolution strategy is 'ignore'
+ * @throws ProcessException if the file already exists and the conflict
resolution strategy is 'fail'; also in case of other errors
+ */
+ DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient,
final String fileName, final String conflictResolution) {
+ final String destinationPath =
createPath(directoryClient.getDirectoryPath(), fileName);
+
+ try {
+ final boolean overwrite =
conflictResolution.equals(REPLACE_RESOLUTION);
+ return directoryClient.createFile(fileName, overwrite);
+ } catch (DataLakeStorageException dataLakeStorageException) {
+ return handleDataLakeStorageException(dataLakeStorageException,
destinationPath, conflictResolution);
+ }
+ }
+
+ /**
+ * This method serves as a "commit" for the upload process in case of
'Write and Rename' strategy. In order to prevent work-in-progress files from
being available for readers,
+ * a temporary file is written first, and then renamed/moved to its final
destination. It is not an efficient approach in case of conflicts because
FlowFiles are uploaded unnecessarily,
+ * but it is a calculated risk because consistency is more important for
'Write and Rename' strategy.
* <p>
* Visible for testing
*
- * @param sourceFileClient client of the temporary file
+ * @param sourceFileClient file client of the temporary file
* @param destinationDirectory final location of the uploaded file
* @param destinationFileName final name of the uploaded file
* @param conflictResolution conflict resolution strategy
- * @return URL of the uploaded file
+ * @return the file client of the uploaded file or {@code null} if the
file already exists and conflict resolution strategy is 'ignore'
+ * @throws ProcessException if the file already exists and the conflict
resolution strategy is 'fail'; also in case of other errors
*/
- String renameFile(final DataLakeFileClient sourceFileClient, final String
destinationDirectory, final String destinationFileName, final String
conflictResolution) {
+ DataLakeFileClient renameFile(final DataLakeFileClient sourceFileClient,
final String destinationDirectory, final String destinationFileName, final
String conflictResolution) {
final String destinationPath = createPath(destinationDirectory,
destinationFileName);
try {
final DataLakeRequestConditions destinationCondition = new
DataLakeRequestConditions();
if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
destinationCondition.setIfNoneMatch("*");
}
- return sourceFileClient.renameWithResponse(null, destinationPath,
null, destinationCondition, null, null).getValue().getFileUrl();
+ return sourceFileClient.renameWithResponse(null, destinationPath,
null, destinationCondition, null, null).getValue();
} catch (DataLakeStorageException dataLakeStorageException) {
- removeTempFile(sourceFileClient);
- if (dataLakeStorageException.getStatusCode() == 409 &&
conflictResolution.equals(IGNORE_RESOLUTION)) {
- getLogger().info("File [{}] already exists. Remote file not
modified due to {} being set to '{}'.",
- destinationPath, CONFLICT_RESOLUTION.getDisplayName(),
conflictResolution);
- return null;
- } else if (dataLakeStorageException.getStatusCode() == 409 &&
conflictResolution.equals(FAIL_RESOLUTION)) {
- throw new ProcessException(String.format("File [%s] already
exists.", destinationPath), dataLakeStorageException);
- } else {
- throw new ProcessException(String.format("Renaming File [%s]
failed", destinationPath), dataLakeStorageException);
- }
+ removeFile(sourceFileClient);
+
+ return handleDataLakeStorageException(dataLakeStorageException,
destinationPath, conflictResolution);
+ }
+ }
+
+ private DataLakeFileClient handleDataLakeStorageException(final
DataLakeStorageException dataLakeStorageException, final String
destinationPath, final String conflictResolution) {
+ if (dataLakeStorageException.getStatusCode() == 409 &&
conflictResolution.equals(IGNORE_RESOLUTION)) {
Review Comment:
`BlobErrorCode` comes from the Blob client library (`azure-storage-blob`)
and it seems the ADLS service returns a different code: `PathAlreadyExists`.
I did not find a similar error code class in the ADLS client library
(`azure-storage-file-datalake`).
In general, I did not want to change the logic here because this code
section was extracted from an existing method but introducing a local variable
makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]