turcsanyip commented on a change in pull request #4287:
URL: https://github.com/apache/nifi/pull/4287#discussion_r430546794



##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
##########
@@ -76,27 +109,45 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
             final DataLakeServiceClient storageClient = 
getStorageClient(context, flowFile);
             final DataLakeFileSystemClient fileSystemClient = 
storageClient.getFileSystemClient(fileSystem);
             final DataLakeDirectoryClient directoryClient = 
fileSystemClient.getDirectoryClient(directory);
-            final DataLakeFileClient fileClient = 
directoryClient.createFile(fileName);
+            final DataLakeFileClient fileClient;
+
+            final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
+            boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
+
+            try {
+                fileClient = directoryClient.createFile(fileName, overwrite);
+
+                final long length = flowFile.getSize();
+                if (length > 0) {
+                    try (final InputStream rawIn = session.read(flowFile); 
final BufferedInputStream in = new BufferedInputStream(rawIn)) {
+                        fileClient.append(in, 0, length);
+                    }
+                }
+                fileClient.flush(length);
+
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.put("azure.filesystem", fileSystem);
+                attributes.put("azure.directory", directory);
+                attributes.put("azure.filename", fileName);
+                attributes.put("azure.primaryUri", fileClient.getFileUrl());
+                attributes.put("azure.length", String.valueOf(length));
+                flowFile = session.putAllAttributes(flowFile, attributes);
 
-            final long length = flowFile.getSize();
-            if (length > 0) {
-                try (final InputStream rawIn = session.read(flowFile); final 
BufferedInputStream in = new BufferedInputStream(rawIn)) {
-                    fileClient.append(in, 0, length);
+                session.transfer(flowFile, REL_SUCCESS);
+                final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                session.getProvenanceReporter().send(flowFile, 
fileClient.getFileUrl(), transferMillis);
+            } catch (DataLakeStorageException dlsException) {
+                if (dlsException.getStatusCode() == 409) {
+                    if (conflictResolution.equals(IGNORE_RESOLUTION)) {
+                        session.transfer(flowFile, REL_SUCCESS);
+                        getLogger().warn("Transferring {} to success because 
file with same name already exists", new Object[]{flowFile});

Review comment:
       The warning message does not properly describe the cause and the effect: 
file exists => transfer to success
   The reason for transferring to success is the 'Ignore' resolution policy 
rather.
   It should also be mentioned that the file has not been overwritten in Azure.
   

##########
File path: 
nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
##########
@@ -253,6 +300,14 @@ private void assertFlowFile(String directory, String 
fileName, byte[] fileData)
         flowFile.assertAttributeEquals("azure.length", 
Integer.toString(fileData.length));
     }
 
+    private void assertSimpleFlowFile(byte[] fileData) throws Exception {

Review comment:
       I could be called from `assertFlowFile` because the first section of 
that method is the same.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to