This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new ae877b9  NIFI-7591: Allow PutS3Object to post to AWS Snowball
ae877b9 is described below

commit ae877b9908aa1357c71e2e3d039829d6b7658fd4
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Jul 6 13:14:01 2020 +0200

    NIFI-7591: Allow PutS3Object to post to AWS Snowball
    
    Added properties to enable/disable chunked encoding and path-style access
    for endpoints that do not support chunked encoding / only support 
path-style access.
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #4386.
---
 .../processors/aws/s3/AbstractS3Processor.java     | 42 ++++++++--
 .../apache/nifi/processors/aws/s3/PutS3Object.java |  4 +-
 .../nifi/processors/aws/s3/ITPutS3Object.java      | 92 +++++++++++-----------
 .../nifi/processors/aws/s3/TestPutS3Object.java    |  4 +-
 4 files changed, 89 insertions(+), 53 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 3f01543..e089785 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -136,6 +136,21 @@ public abstract class AbstractS3Processor extends 
AbstractAWSCredentialsProvider
             .required(false)
             .identifiesControllerService(AmazonS3EncryptionService.class)
             .build();
+    public static final PropertyDescriptor USE_CHUNKED_ENCODING = new 
PropertyDescriptor.Builder()
+            .name("use-chunked-encoding")
+            .displayName("Use Chunked Encoding")
+            .description("Enables / disables chunked encoding for upload 
requests. Set it to false only if your endpoint does not support chunked 
uploading.")
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
+    public static final PropertyDescriptor USE_PATH_STYLE_ACCESS = new 
PropertyDescriptor.Builder()
+            .name("use-path-style-access")
+            .displayName("Use Path Style Access")
+            .description("Path-style access can be enforced by setting this 
property to true. Set it to true if your endpoint does not support " +
+                    "virtual-hosted-style requests, only path-style requests.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
 
     /**
      * Create client using credentials provider. This is the preferred way for 
creating clients
@@ -155,17 +170,32 @@ public abstract class AbstractS3Processor extends 
AbstractAWSCredentialsProvider
             s3 = new AmazonS3Client(credentialsProvider, config);
         }
 
-        initalizeEndpointOverride(context, s3);
+        configureClientOptions(context, s3);
+
         return s3;
     }
 
-    private void initalizeEndpointOverride(final ProcessContext context, final 
AmazonS3Client s3) {
+    private void configureClientOptions(final ProcessContext context, final 
AmazonS3Client s3) {
+        S3ClientOptions.Builder builder = S3ClientOptions.builder();
+
+        // disable chunked encoding if "Use Chunked Encoding" has been set to 
false, otherwise use the default (not disabled)
+        Boolean useChunkedEncoding = 
context.getProperty(USE_CHUNKED_ENCODING).asBoolean();
+        if (useChunkedEncoding != null && !useChunkedEncoding) {
+            builder.disableChunkedEncoding();
+        }
+
+        // use PathStyleAccess if "Use Path Style Access" has been set to 
true, otherwise use the default (false)
+        Boolean usePathStyleAccess = 
context.getProperty(USE_PATH_STYLE_ACCESS).asBoolean();
+        if (usePathStyleAccess != null && usePathStyleAccess) {
+            builder.setPathStyleAccess(true);
+        }
+
         // if ENDPOINT_OVERRIDE is set, use PathStyleAccess
-        
if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()).isEmpty()
 == false){
-            final S3ClientOptions s3Options = new S3ClientOptions();
-            s3Options.setPathStyleAccess(true);
-            s3.setS3ClientOptions(s3Options);
+        if 
(!StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()).isEmpty()){
+            builder.setPathStyleAccess(true);
         }
+
+        s3.setS3ClientOptions(builder.build());
     }
 
     private void initializeSignerOverride(final ProcessContext context, final 
ClientConfiguration config) {
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 1c2bc01..251a855 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -235,8 +235,8 @@ public class PutS3Object extends AbstractS3Processor {
         Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX, 
REMOVE_TAG_PREFIX,
             STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, 
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, 
WRITE_ACL_LIST, OWNER,
             CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE, 
SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE, 
MULTIPART_S3_AGEOFF_INTERVAL,
-            MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, 
PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
-            PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
+            MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE, 
USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS,
+            PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT, 
PROXY_USERNAME, PROXY_PASSWORD));
 
     final static String S3_BUCKET_KEY = "s3.bucket";
     final static String S3_OBJECT_KEY = "s3.key";
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index dd5f990..bc79367 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -113,11 +113,7 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testSimplePut() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+        TestRunner runner = initTestRunner();
 
         Assert.assertTrue(runner.setProperty("x-custom-prop", 
"hello").isValid());
 
@@ -133,11 +129,8 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testSimplePutEncrypted() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+        TestRunner runner = initTestRunner();
 
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
         runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, 
ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
 
         Assert.assertTrue(runner.setProperty("x-custom-prop", 
"hello").isValid());
@@ -158,11 +151,7 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testSimplePutFilenameWithNationalCharacters() throws 
IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+        TestRunner runner = initTestRunner();
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "Iñtërnâtiônàližætiøn.txt");
@@ -176,11 +165,8 @@ public class ITPutS3Object extends AbstractS3IT {
     private void testPutThenFetch(String sseAlgorithm) throws IOException {
 
         // Put
-        TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+        TestRunner runner = initTestRunner();
 
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
         if(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION.equals(sseAlgorithm)){
             runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION, 
sseAlgorithm);
         }
@@ -292,12 +278,8 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testContentType() throws IOException {
-        PutS3Object processor = new PutS3Object();
-        final TestRunner runner = TestRunners.newTestRunner(processor);
+        TestRunner runner = initTestRunner();
 
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
         runner.setProperty(PutS3Object.CONTENT_TYPE, "text/plain");
 
         runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
@@ -312,10 +294,7 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testPutInFolder() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+        TestRunner runner = initTestRunner();
 
         Assert.assertTrue(runner.setProperty("x-custom-prop", 
"hello").isValid());
         runner.assertValid();
@@ -331,11 +310,7 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testStorageClasses() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+        TestRunner runner = initTestRunner();
 
         Assert.assertTrue(runner.setProperty("x-custom-prop", 
"hello").isValid());
 
@@ -358,11 +333,8 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testStorageClassesMultipart() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+        TestRunner runner = initTestRunner();
 
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
         runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
         runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB");
 
@@ -387,12 +359,9 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testPermissions() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+        TestRunner runner = initTestRunner();
 
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
         
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
-        runner.setProperty(PutS3Object.REGION, REGION);
 
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "folder/4.txt");
@@ -918,10 +887,8 @@ public class ITPutS3Object extends AbstractS3IT {
 
     @Test
     public void testObjectTags() throws IOException, InterruptedException {
-        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(PutS3Object.REGION, REGION);
-        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+        TestRunner runner = initTestRunner();
+
         runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
         runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "true");
 
@@ -1194,4 +1161,41 @@ public class ITPutS3Object extends AbstractS3IT {
             return this.getClient();
         }
     }
+
+    @Test
+    public void testChunkedEncodingDisabled() throws IOException {
+        TestRunner runner = initTestRunner();
+
+        runner.setProperty(PutS3Object.USE_CHUNKED_ENCODING, "false");
+
+        executeSimplePutTest(runner);
+    }
+
+    @Test
+    public void testPathStyleAccessEnabled() throws IOException {
+        TestRunner runner = initTestRunner();
+
+        runner.setProperty(PutS3Object.USE_PATH_STYLE_ACCESS, "true");
+
+        executeSimplePutTest(runner);
+    }
+
+    private TestRunner initTestRunner() {
+        TestRunner runner = TestRunners.newTestRunner(PutS3Object.class);
+
+        runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(PutS3Object.REGION, REGION);
+        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+
+        return runner;
+    }
+
+    private void executeSimplePutTest(TestRunner runner) throws IOException {
+        runner.assertValid();
+
+        runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index 6ba6de8..ffba452 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -209,7 +209,7 @@ public class TestPutS3Object {
     public void testGetPropertyDescriptors() {
         PutS3Object processor = new PutS3Object();
         List<PropertyDescriptor> pd = 
processor.getSupportedPropertyDescriptors();
-        assertEquals("size should be eq", 34, pd.size());
+        assertEquals("size should be eq", 36, pd.size());
         assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
         assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
         assertTrue(pd.contains(PutS3Object.BUCKET));
@@ -232,6 +232,8 @@ public class TestPutS3Object {
         assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
         assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
         assertTrue(pd.contains(PutS3Object.ENCRYPTION_SERVICE));
+        assertTrue(pd.contains(PutS3Object.USE_CHUNKED_ENCODING));
+        assertTrue(pd.contains(PutS3Object.USE_PATH_STYLE_ACCESS));
         assertTrue(pd.contains(PutS3Object.PROXY_CONFIGURATION_SERVICE));
         assertTrue(pd.contains(PutS3Object.PROXY_HOST));
         assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));

Reply via email to