This is an automated email from the ASF dual-hosted git repository.

jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 22fcb59  NIFI-8346 Route to failure in PutAzureBlobStorage for auth, 
IO exceptions
22fcb59 is described below

commit 22fcb594d9c5c08120072e5e600a2fb306f665bf
Author: Denes Arvay <[email protected]>
AuthorDate: Fri Mar 19 12:57:47 2021 +0100

    NIFI-8346 Route to failure in PutAzureBlobStorage for auth, IO exceptions
    
    This closes #4917
    
    Signed-off-by: Joey Frazee <[email protected]>
---
 .../azure/storage/PutAzureBlobStorage.java         | 12 ++++--
 .../azure/storage/ITPutAzureBlobStorage.java       | 12 ++++++
 .../azure/storage/TestPutAzureBlobStorage.java     | 48 ++++++++++++++++++++++
 3 files changed, 69 insertions(+), 3 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index c38e3c6..859f2be 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -29,6 +29,7 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.azure.storage.OperationContext;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -141,16 +142,16 @@ public class PutAzureBlobStorage extends 
AbstractAzureBlobProcessor {
                 }
 
                 try {
-                    blob.upload(in, -1, null, null, operationContext);
+                    uploadBlob(blob, operationContext, in);
                     BlobProperties properties = blob.getProperties();
                     attributes.put("azure.container", containerName);
                     attributes.put("azure.primaryUri", 
blob.getSnapshotQualifiedUri().toString());
                     attributes.put("azure.etag", properties.getEtag());
                     attributes.put("azure.length", String.valueOf(length));
                     attributes.put("azure.timestamp", 
String.valueOf(properties.getLastModified()));
-                } catch (StorageException | URISyntaxException e) {
+                } catch (StorageException | URISyntaxException | IOException 
e) {
                     storedException.set(e);
-                    throw new IOException(e);
+                    throw e instanceof IOException ? (IOException) e : new 
IOException(e);
                 }
             });
 
@@ -175,6 +176,11 @@ public class PutAzureBlobStorage extends 
AbstractAzureBlobProcessor {
 
     }
 
+    @VisibleForTesting
+    void uploadBlob(CloudBlob blob, OperationContext operationContext, 
InputStream in) throws StorageException, IOException {
+        blob.upload(in, -1, null, null, operationContext);
+    }
+
     // Used to help force Azure Blob SDK to write in blocks
     private static class UnmarkableInputStream extends FilterInputStream {
         public UnmarkableInputStream(InputStream in) {
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
index e006c2c..142b592 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.storage;
 
 import com.microsoft.azure.storage.blob.ListBlobItem;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.Before;
 import org.junit.Test;
@@ -58,6 +59,17 @@ public class ITPutAzureBlobStorage extends 
AbstractAzureBlobStorageIT {
         assertResult();
     }
 
+    @Test
+    public void testInvalidCredentialsRoutesToFailure() {
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, 
"aW52YWxpZGludmFsaWQ=");
+        runner.assertValid();
+        runner.enqueue("test".getBytes());
+        runner.run();
+
+        runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
+    }
+
     private void assertResult() throws Exception {
         runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 
1);
         List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
new file mode 100644
index 0000000..b01264a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+
+public class TestPutAzureBlobStorage {
+
+    @Test
+    public void testIOExceptionDuringUploadTransfersToFailure() throws 
Exception {
+        PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage());
+        doThrow(IOException.class).when(processor).uploadBlob(any(), any(), 
any());
+
+        TestRunner runner = TestRunners.newTestRunner(processor);
+        runner.setProperty(PutAzureBlobStorage.BLOB, "test");
+        runner.setProperty(AzureStorageUtils.CONTAINER, "test");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "test");
+        runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "test");
+
+        runner.enqueue("test data");
+        runner.run();
+
+        runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
+    }
+}

Reply via email to