This is an automated email from the ASF dual-hosted git repository.
jfrazee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 22fcb59 NIFI-8346 Route to failure in PutAzureBlobStorage for auth,
IO exceptions
22fcb59 is described below
commit 22fcb594d9c5c08120072e5e600a2fb306f665bf
Author: Denes Arvay <[email protected]>
AuthorDate: Fri Mar 19 12:57:47 2021 +0100
NIFI-8346 Route to failure in PutAzureBlobStorage for auth, IO exceptions
This closes #4917
Signed-off-by: Joey Frazee <[email protected]>
---
.../azure/storage/PutAzureBlobStorage.java | 12 ++++--
.../azure/storage/ITPutAzureBlobStorage.java | 12 ++++++
.../azure/storage/TestPutAzureBlobStorage.java | 48 ++++++++++++++++++++++
3 files changed, 69 insertions(+), 3 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
index c38e3c6..859f2be 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -29,6 +29,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.storage.OperationContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -141,16 +142,16 @@ public class PutAzureBlobStorage extends
AbstractAzureBlobProcessor {
}
try {
- blob.upload(in, -1, null, null, operationContext);
+ uploadBlob(blob, operationContext, in);
BlobProperties properties = blob.getProperties();
attributes.put("azure.container", containerName);
attributes.put("azure.primaryUri",
blob.getSnapshotQualifiedUri().toString());
attributes.put("azure.etag", properties.getEtag());
attributes.put("azure.length", String.valueOf(length));
attributes.put("azure.timestamp",
String.valueOf(properties.getLastModified()));
- } catch (StorageException | URISyntaxException e) {
+ } catch (StorageException | URISyntaxException | IOException
e) {
storedException.set(e);
- throw new IOException(e);
+ throw e instanceof IOException ? (IOException) e : new
IOException(e);
}
});
@@ -175,6 +176,11 @@ public class PutAzureBlobStorage extends
AbstractAzureBlobProcessor {
}
+ @VisibleForTesting
+ void uploadBlob(CloudBlob blob, OperationContext operationContext,
InputStream in) throws StorageException, IOException {
+ blob.upload(in, -1, null, null, operationContext);
+ }
+
// Used to help force Azure Blob SDK to write in blocks
private static class UnmarkableInputStream extends FilterInputStream {
public UnmarkableInputStream(InputStream in) {
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
index e006c2c..142b592 100644
---
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureBlobStorage.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.storage;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.util.MockFlowFile;
import org.junit.Before;
import org.junit.Test;
@@ -58,6 +59,17 @@ public class ITPutAzureBlobStorage extends
AbstractAzureBlobStorageIT {
assertResult();
}
+ @Test
+ public void testInvalidCredentialsRoutesToFailure() {
+ runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "invalid");
+ runner.setProperty(AzureStorageUtils.ACCOUNT_KEY,
"aW52YWxpZGludmFsaWQ=");
+ runner.assertValid();
+ runner.enqueue("test".getBytes());
+ runner.run();
+
+ runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
+ }
+
private void assertResult() throws Exception {
runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS,
1);
List<MockFlowFile> flowFilesForRelationship =
runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
diff --git
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
new file mode 100644
index 0000000..b01264a
--- /dev/null
+++
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestPutAzureBlobStorage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+
+public class TestPutAzureBlobStorage {
+
+ @Test
+ public void testIOExceptionDuringUploadTransfersToFailure() throws
Exception {
+ PutAzureBlobStorage processor = Mockito.spy(new PutAzureBlobStorage());
+ doThrow(IOException.class).when(processor).uploadBlob(any(), any(),
any());
+
+ TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(PutAzureBlobStorage.BLOB, "test");
+ runner.setProperty(AzureStorageUtils.CONTAINER, "test");
+ runner.setProperty(AzureStorageUtils.ACCOUNT_NAME, "test");
+ runner.setProperty(AzureStorageUtils.ACCOUNT_KEY, "test");
+
+ runner.enqueue("test data");
+ runner.run();
+
+ runner.assertTransferCount(PutAzureBlobStorage.REL_FAILURE, 1);
+ }
+}