This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ea425e235 NIFI-10656 Log ignored event with info instead of warning
4ea425e235 is described below

commit 4ea425e23585ff04dd554deef8ef41b98104e0f4
Author: Nandor Soma Abonyi <[email protected]>
AuthorDate: Mon Oct 17 00:48:48 2022 +0200

    NIFI-10656 Log ignored event with info instead of warning
    
    This closes #6540.
    
    Signed-off-by: Peter Turcsanyi <[email protected]>
---
 .../azure/storage/PutAzureDataLakeStorage.java     | 84 ++++++++++-----------
 .../azure/storage/ITPutAzureDataLakeStorage.java   | 52 -------------
 .../azure/storage/TestPutAzureDataLakeStorage.java | 87 ++++++++++++++++++++++
 3 files changed, 129 insertions(+), 94 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index 6dd03e84de..9fcd5a1d77 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -129,49 +129,27 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
             final String tempDirectory = createPath(tempPath, 
TEMP_FILE_DIRECTORY);
             final String fileName = evaluateFileNameProperty(context, 
flowFile);
 
-            final DataLakeServiceClient storageClient = 
getStorageClient(context, flowFile);
-            final DataLakeFileSystemClient fileSystemClient = 
storageClient.getFileSystemClient(fileSystem);
+            final DataLakeFileSystemClient fileSystemClient = 
getFileSystemClient(context, flowFile, fileSystem);
             final DataLakeDirectoryClient directoryClient = 
fileSystemClient.getDirectoryClient(originalDirectory);
-            final DataLakeFileClient tempFileClient;
-            final DataLakeFileClient renamedFileClient;
 
             final String tempFilePrefix = UUID.randomUUID().toString();
             final DataLakeDirectoryClient tempDirectoryClient = 
fileSystemClient.getDirectoryClient(tempDirectory);
             final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
-            boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION);
-
-            try {
-                tempFileClient = tempDirectoryClient.createFile(tempFilePrefix 
+ fileName, true);
-                appendContent(flowFile, tempFileClient, session);
-                createDirectoryIfNotExists(directoryClient);
-                renamedFileClient = renameFile(fileName, 
directoryClient.getDirectoryPath(), tempFileClient, overwrite);
-
-                final Map<String, String> attributes = new HashMap<>();
-                attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
-                attributes.put(ATTR_NAME_DIRECTORY, originalDirectory);
-                attributes.put(ATTR_NAME_FILENAME, fileName);
-                attributes.put(ATTR_NAME_PRIMARY_URI, 
renamedFileClient.getFileUrl());
-                attributes.put(ATTR_NAME_LENGTH, 
String.valueOf(flowFile.getSize()));
+
+            final DataLakeFileClient tempFileClient = 
tempDirectoryClient.createFile(tempFilePrefix + fileName, true);
+            appendContent(flowFile, tempFileClient, session);
+            createDirectoryIfNotExists(directoryClient);
+
+            final String fileUrl = renameFile(tempFileClient, 
directoryClient.getDirectoryPath(), fileName, conflictResolution);
+            if (fileUrl != null) {
+                final Map<String, String> attributes = 
createAttributeMap(flowFile, fileSystem, originalDirectory, fileName, fileUrl);
                 flowFile = session.putAllAttributes(flowFile, attributes);
 
-                session.transfer(flowFile, REL_SUCCESS);
                 final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                session.getProvenanceReporter().send(flowFile, 
renamedFileClient.getFileUrl(), transferMillis);
-            } catch (DataLakeStorageException dlsException) {
-                if (dlsException.getStatusCode() == 409) {
-                    if (conflictResolution.equals(IGNORE_RESOLUTION)) {
-                        session.transfer(flowFile, REL_SUCCESS);
-                        String warningMessage = String.format("File with the 
same name already exists. " +
-                                "Remote file not modified. " +
-                                "Transferring {} to success due to %s being 
set to '%s'.", CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
-                        getLogger().warn(warningMessage, new 
Object[]{flowFile});
-                    } else {
-                        throw dlsException;
-                    }
-                } else {
-                    throw dlsException;
-                }
+                session.getProvenanceReporter().send(flowFile, fileUrl, 
transferMillis);
             }
+
+            session.transfer(flowFile, REL_SUCCESS);
         } catch (Exception e) {
             getLogger().error("Failed to create file on Azure Data Lake 
Storage", e);
             flowFile = session.penalize(flowFile);
@@ -179,13 +157,28 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
         }
     }
 
+    private DataLakeFileSystemClient getFileSystemClient(ProcessContext 
context, FlowFile flowFile, String fileSystem) {
+        final DataLakeServiceClient storageClient = getStorageClient(context, 
flowFile);
+        return storageClient.getFileSystemClient(fileSystem);
+    }
+
+    private Map<String, String> createAttributeMap(FlowFile flowFile, String 
fileSystem, String originalDirectory, String fileName, String fileUrl) {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
+        attributes.put(ATTR_NAME_DIRECTORY, originalDirectory);
+        attributes.put(ATTR_NAME_FILENAME, fileName);
+        attributes.put(ATTR_NAME_PRIMARY_URI, fileUrl);
+        attributes.put(ATTR_NAME_LENGTH, String.valueOf(flowFile.getSize()));
+        return attributes;
+    }
+
     private void createDirectoryIfNotExists(DataLakeDirectoryClient 
directoryClient) {
         if (!directoryClient.getDirectoryPath().isEmpty() && 
!directoryClient.exists()) {
             directoryClient.create();
         }
     }
 
-   //Visible for testing
+    //Visible for testing
     void appendContent(FlowFile flowFile, DataLakeFileClient fileClient, 
ProcessSession session) throws IOException {
         final long length = flowFile.getSize();
         if (length > 0) {
@@ -220,18 +213,25 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
     }
 
     //Visible for testing
-    DataLakeFileClient renameFile(final String fileName, final String 
directoryPath, final DataLakeFileClient fileClient, final boolean overwrite) {
+    String renameFile(final DataLakeFileClient sourceFileClient, final String 
destinationDirectory, final String destinationFileName, final String 
conflictResolution) {
         try {
             final DataLakeRequestConditions destinationCondition = new 
DataLakeRequestConditions();
-            if (!overwrite) {
+            if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
                 destinationCondition.setIfNoneMatch("*");
             }
-            final String destinationPath = createPath(directoryPath, fileName);
-            return fileClient.renameWithResponse(null, destinationPath, null, 
destinationCondition, null, null).getValue();
+            final String destinationPath = createPath(destinationDirectory, 
destinationFileName);
+            return sourceFileClient.renameWithResponse(null, destinationPath, 
null, destinationCondition, null, null).getValue().getFileUrl();
         } catch (DataLakeStorageException dataLakeStorageException) {
-            getLogger().error("Renaming File [{}] failed", 
fileClient.getFileName(), dataLakeStorageException);
-            removeTempFile(fileClient);
-            throw dataLakeStorageException;
+            removeTempFile(sourceFileClient);
+            if (dataLakeStorageException.getStatusCode() == 409 && 
conflictResolution.equals(IGNORE_RESOLUTION)) {
+                getLogger().info("File with the same name [{}] already exists. 
Remote file not modified due to {} being set to '{}'.",
+                        sourceFileClient.getFileName(), 
CONFLICT_RESOLUTION.getDisplayName(), conflictResolution);
+                return null;
+            } else if (dataLakeStorageException.getStatusCode() == 409 && 
conflictResolution.equals(FAIL_RESOLUTION)) {
+                throw new ProcessException(String.format("File with the same 
name [%s] already exists.", sourceFileClient.getFileName()), 
dataLakeStorageException);
+            } else {
+                throw new ProcessException(String.format("Renaming File [%s] 
failed", sourceFileClient.getFileName()), dataLakeStorageException);
+            }
         }
     }
 
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index f249b38a9e..711b9f6d5e 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -16,24 +16,16 @@
  */
 package org.apache.nifi.processors.azure.storage;
 
-import com.azure.core.http.rest.Response;
 import com.azure.storage.file.datalake.DataLakeDirectoryClient;
 import com.azure.storage.file.datalake.DataLakeFileClient;
-import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
-import com.azure.storage.file.datalake.models.DataLakeStorageException;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
-import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
@@ -47,16 +39,7 @@ import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM;
 import static 
org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.ArgumentMatchers.isNull;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT {
 
@@ -263,41 +246,6 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertFailure();
     }
 
-    @Test
-    public void testPutFileButFailedToAppend() {
-        final PutAzureDataLakeStorage processor = new 
PutAzureDataLakeStorage();
-        final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
-        final ProcessSession session = mock(ProcessSession.class);
-        final FlowFile flowFile = mock(FlowFile.class);
-
-        when(flowFile.getSize()).thenReturn(1L);
-        
doThrow(IllegalArgumentException.class).when(fileClient).append(any(InputStream.class),
 anyLong(), anyLong());
-
-        assertThrows(IllegalArgumentException.class, () -> 
processor.appendContent(flowFile, fileClient, session));
-        verify(fileClient).delete();
-    }
-
-    @Test
-    public void testPutFileButFailedToRename() {
-        final PutAzureDataLakeStorage processor = new 
PutAzureDataLakeStorage();
-        final ProcessorInitializationContext initContext = 
mock(ProcessorInitializationContext.class);
-        final String componentId = "componentId";
-        final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
-        final Response<DataLakeFileClient> response = mock(Response.class);
-        //Mock logger
-        when(initContext.getIdentifier()).thenReturn(componentId);
-        MockComponentLog componentLog = new MockComponentLog(componentId, 
processor);
-        when(initContext.getLogger()).thenReturn(componentLog);
-        processor.initialize(initContext);
-        //Mock renameWithResponse Azure method
-        when(fileClient.renameWithResponse(isNull(), anyString(), isNull(), 
any(DataLakeRequestConditions.class), isNull(), isNull())).thenReturn(response);
-        when(response.getValue()).thenThrow(DataLakeStorageException.class);
-        when(fileClient.getFileName()).thenReturn(FILE_NAME);
-
-        assertThrows(DataLakeStorageException.class, () -> 
processor.renameFile(FILE_NAME, "", fileClient, false));
-        verify(fileClient).delete();
-    }
-
     private Map<String, String> createAttributesMap() {
         Map<String, String> attributes = new HashMap<>();
 
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureDataLakeStorage.java
new file mode 100644
index 0000000000..e7a22802e7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureDataLakeStorage.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import com.azure.core.http.rest.Response;
+import com.azure.storage.file.datalake.DataLakeFileClient;
+import com.azure.storage.file.datalake.models.DataLakeRequestConditions;
+import com.azure.storage.file.datalake.models.DataLakeStorageException;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Test;
+
+import java.io.InputStream;
+
+import static 
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage.FAIL_RESOLUTION;
+import static 
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage.IGNORE_RESOLUTION;
+import static 
org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage.REPLACE_RESOLUTION;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestPutAzureDataLakeStorage {
+
+    private static final String FILE_NAME = "file1";
+
+    @Test
+    public void testPutFileButFailedToAppend() {
+        final PutAzureDataLakeStorage processor = new 
PutAzureDataLakeStorage();
+        final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
+        final ProcessSession session = mock(ProcessSession.class);
+        final FlowFile flowFile = mock(FlowFile.class);
+
+        when(flowFile.getSize()).thenReturn(1L);
+        
doThrow(IllegalArgumentException.class).when(fileClient).append(any(InputStream.class),
 anyLong(), anyLong());
+
+        assertThrows(IllegalArgumentException.class, () -> 
processor.appendContent(flowFile, fileClient, session));
+        verify(fileClient).delete();
+    }
+
+    @Test
+    public void testPutFileButFailedToRenameWithUnrecoverableError() {
+        final PutAzureDataLakeStorage processor = new 
PutAzureDataLakeStorage();
+        final ProcessorInitializationContext initContext = 
mock(ProcessorInitializationContext.class);
+        final String componentId = "componentId";
+        final DataLakeFileClient fileClient = mock(DataLakeFileClient.class);
+        final Response<DataLakeFileClient> response = mock(Response.class);
+        final DataLakeStorageException exception = 
mock(DataLakeStorageException.class);
+        //Mock logger
+        when(initContext.getIdentifier()).thenReturn(componentId);
+        MockComponentLog componentLog = new MockComponentLog(componentId, 
processor);
+        when(initContext.getLogger()).thenReturn(componentLog);
+        processor.initialize(initContext);
+        //Mock renameWithResponse Azure method
+        when(fileClient.renameWithResponse(isNull(), anyString(), isNull(), 
any(DataLakeRequestConditions.class), isNull(), isNull())).thenReturn(response);
+        when(fileClient.getFileName()).thenReturn(FILE_NAME);
+        when(exception.getStatusCode()).thenReturn(405);
+        when(response.getValue()).thenThrow(exception);
+        assertThrows(ProcessException.class, () -> 
processor.renameFile(fileClient, "", FILE_NAME, FAIL_RESOLUTION));
+        assertThrows(ProcessException.class, () -> 
processor.renameFile(fileClient, "", FILE_NAME, REPLACE_RESOLUTION));
+        assertThrows(ProcessException.class, () -> 
processor.renameFile(fileClient, "", FILE_NAME, IGNORE_RESOLUTION));
+        verify(fileClient, times(3)).delete();
+    }
+}
\ No newline at end of file

Reply via email to