This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ae877b9 NIFI-7591: Allow PutS3Object to post to AWS Snowball
ae877b9 is described below
commit ae877b9908aa1357c71e2e3d039829d6b7658fd4
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Mon Jul 6 13:14:01 2020 +0200
NIFI-7591: Allow PutS3Object to post to AWS Snowball
Added properties to enable/disable chunked encoding and path-style access
for endpoints that do not support chunked encoding / only support
path-style access.
Signed-off-by: Pierre Villard <[email protected]>
This closes #4386.
---
.../processors/aws/s3/AbstractS3Processor.java | 42 ++++++++--
.../apache/nifi/processors/aws/s3/PutS3Object.java | 4 +-
.../nifi/processors/aws/s3/ITPutS3Object.java | 92 +++++++++++-----------
.../nifi/processors/aws/s3/TestPutS3Object.java | 4 +-
4 files changed, 89 insertions(+), 53 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 3f01543..e089785 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -136,6 +136,21 @@ public abstract class AbstractS3Processor extends
AbstractAWSCredentialsProvider
.required(false)
.identifiesControllerService(AmazonS3EncryptionService.class)
.build();
+ public static final PropertyDescriptor USE_CHUNKED_ENCODING = new
PropertyDescriptor.Builder()
+ .name("use-chunked-encoding")
+ .displayName("Use Chunked Encoding")
+ .description("Enables / disables chunked encoding for upload
requests. Set it to false only if your endpoint does not support chunked
uploading.")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .build();
+ public static final PropertyDescriptor USE_PATH_STYLE_ACCESS = new
PropertyDescriptor.Builder()
+ .name("use-path-style-access")
+ .displayName("Use Path Style Access")
+ .description("Path-style access can be enforced by setting this
property to true. Set it to true if your endpoint does not support " +
+ "virtual-hosted-style requests, only path-style requests.")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
/**
* Create client using credentials provider. This is the preferred way for
creating clients
@@ -155,17 +170,32 @@ public abstract class AbstractS3Processor extends
AbstractAWSCredentialsProvider
s3 = new AmazonS3Client(credentialsProvider, config);
}
- initalizeEndpointOverride(context, s3);
+ configureClientOptions(context, s3);
+
return s3;
}
- private void initalizeEndpointOverride(final ProcessContext context, final
AmazonS3Client s3) {
+ private void configureClientOptions(final ProcessContext context, final
AmazonS3Client s3) {
+ S3ClientOptions.Builder builder = S3ClientOptions.builder();
+
+ // disable chunked encoding if "Use Chunked Encoding" has been set to
false, otherwise use the default (not disabled)
+ Boolean useChunkedEncoding =
context.getProperty(USE_CHUNKED_ENCODING).asBoolean();
+ if (useChunkedEncoding != null && !useChunkedEncoding) {
+ builder.disableChunkedEncoding();
+ }
+
+ // use PathStyleAccess if "Use Path Style Access" has been set to
true, otherwise use the default (false)
+ Boolean usePathStyleAccess =
context.getProperty(USE_PATH_STYLE_ACCESS).asBoolean();
+ if (usePathStyleAccess != null && usePathStyleAccess) {
+ builder.setPathStyleAccess(true);
+ }
+
// if ENDPOINT_OVERRIDE is set, use PathStyleAccess
-
if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()).isEmpty()
== false){
- final S3ClientOptions s3Options = new S3ClientOptions();
- s3Options.setPathStyleAccess(true);
- s3.setS3ClientOptions(s3Options);
+ if
(!StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue()).isEmpty()){
+ builder.setPathStyleAccess(true);
}
+
+ s3.setS3ClientOptions(builder.build());
}
private void initializeSignerOverride(final ProcessContext context, final
ClientConfiguration config) {
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 1c2bc01..251a855 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -235,8 +235,8 @@ public class PutS3Object extends AbstractS3Processor {
Arrays.asList(KEY, BUCKET, CONTENT_TYPE, ACCESS_KEY, SECRET_KEY,
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, OBJECT_TAGS_PREFIX,
REMOVE_TAG_PREFIX,
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST,
WRITE_ACL_LIST, OWNER,
CANNED_ACL, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE,
SIGNER_OVERRIDE, MULTIPART_THRESHOLD, MULTIPART_PART_SIZE,
MULTIPART_S3_AGEOFF_INTERVAL,
- MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE,
PROXY_CONFIGURATION_SERVICE, PROXY_HOST,
- PROXY_HOST_PORT, PROXY_USERNAME, PROXY_PASSWORD));
+ MULTIPART_S3_MAX_AGE, SERVER_SIDE_ENCRYPTION, ENCRYPTION_SERVICE,
USE_CHUNKED_ENCODING, USE_PATH_STYLE_ACCESS,
+ PROXY_CONFIGURATION_SERVICE, PROXY_HOST, PROXY_HOST_PORT,
PROXY_USERNAME, PROXY_PASSWORD));
final static String S3_BUCKET_KEY = "s3.bucket";
final static String S3_OBJECT_KEY = "s3.key";
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index dd5f990..bc79367 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -113,11 +113,7 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testSimplePut() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+ TestRunner runner = initTestRunner();
Assert.assertTrue(runner.setProperty("x-custom-prop",
"hello").isValid());
@@ -133,11 +129,8 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testSimplePutEncrypted() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+ TestRunner runner = initTestRunner();
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION,
ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
Assert.assertTrue(runner.setProperty("x-custom-prop",
"hello").isValid());
@@ -158,11 +151,7 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testSimplePutFilenameWithNationalCharacters() throws
IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+ TestRunner runner = initTestRunner();
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "Iñtërnâtiônàližætiøn.txt");
@@ -176,11 +165,8 @@ public class ITPutS3Object extends AbstractS3IT {
private void testPutThenFetch(String sseAlgorithm) throws IOException {
// Put
- TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+ TestRunner runner = initTestRunner();
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
if(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION.equals(sseAlgorithm)){
runner.setProperty(PutS3Object.SERVER_SIDE_ENCRYPTION,
sseAlgorithm);
}
@@ -292,12 +278,8 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testContentType() throws IOException {
- PutS3Object processor = new PutS3Object();
- final TestRunner runner = TestRunners.newTestRunner(processor);
+ TestRunner runner = initTestRunner();
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.CONTENT_TYPE, "text/plain");
runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
@@ -312,10 +294,7 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testPutInFolder() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+ TestRunner runner = initTestRunner();
Assert.assertTrue(runner.setProperty("x-custom-prop",
"hello").isValid());
runner.assertValid();
@@ -331,11 +310,7 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testStorageClasses() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
-
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+ TestRunner runner = initTestRunner();
Assert.assertTrue(runner.setProperty("x-custom-prop",
"hello").isValid());
@@ -358,11 +333,8 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testStorageClassesMultipart() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+ TestRunner runner = initTestRunner();
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.MULTIPART_THRESHOLD, "50 MB");
runner.setProperty(PutS3Object.MULTIPART_PART_SIZE, "50 MB");
@@ -387,12 +359,9 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testPermissions() throws IOException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+ TestRunner runner = initTestRunner();
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
runner.setProperty(PutS3Object.FULL_CONTROL_USER_LIST,"28545acd76c35c7e91f8409b95fd1aa0c0914bfa1ac60975d9f48bc3c5e090b5");
- runner.setProperty(PutS3Object.REGION, REGION);
final Map<String, String> attrs = new HashMap<>();
attrs.put("filename", "folder/4.txt");
@@ -918,10 +887,8 @@ public class ITPutS3Object extends AbstractS3IT {
@Test
public void testObjectTags() throws IOException, InterruptedException {
- final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
- runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
- runner.setProperty(PutS3Object.REGION, REGION);
- runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+ TestRunner runner = initTestRunner();
+
runner.setProperty(PutS3Object.OBJECT_TAGS_PREFIX, "tagS3");
runner.setProperty(PutS3Object.REMOVE_TAG_PREFIX, "true");
@@ -1194,4 +1161,41 @@ public class ITPutS3Object extends AbstractS3IT {
return this.getClient();
}
}
+
+ @Test
+ public void testChunkedEncodingDisabled() throws IOException {
+ TestRunner runner = initTestRunner();
+
+ runner.setProperty(PutS3Object.USE_CHUNKED_ENCODING, "false");
+
+ executeSimplePutTest(runner);
+ }
+
+ @Test
+ public void testPathStyleAccessEnabled() throws IOException {
+ TestRunner runner = initTestRunner();
+
+ runner.setProperty(PutS3Object.USE_PATH_STYLE_ACCESS, "true");
+
+ executeSimplePutTest(runner);
+ }
+
+ private TestRunner initTestRunner() {
+ TestRunner runner = TestRunners.newTestRunner(PutS3Object.class);
+
+ runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE);
+ runner.setProperty(PutS3Object.REGION, REGION);
+ runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+
+ return runner;
+ }
+
+ private void executeSimplePutTest(TestRunner runner) throws IOException {
+ runner.assertValid();
+
+ runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME));
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
+ }
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
index 6ba6de8..ffba452 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java
@@ -209,7 +209,7 @@ public class TestPutS3Object {
public void testGetPropertyDescriptors() {
PutS3Object processor = new PutS3Object();
List<PropertyDescriptor> pd =
processor.getSupportedPropertyDescriptors();
- assertEquals("size should be eq", 34, pd.size());
+ assertEquals("size should be eq", 36, pd.size());
assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
assertTrue(pd.contains(PutS3Object.BUCKET));
@@ -232,6 +232,8 @@ public class TestPutS3Object {
assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
assertTrue(pd.contains(PutS3Object.SERVER_SIDE_ENCRYPTION));
assertTrue(pd.contains(PutS3Object.ENCRYPTION_SERVICE));
+ assertTrue(pd.contains(PutS3Object.USE_CHUNKED_ENCODING));
+ assertTrue(pd.contains(PutS3Object.USE_PATH_STYLE_ACCESS));
assertTrue(pd.contains(PutS3Object.PROXY_CONFIGURATION_SERVICE));
assertTrue(pd.contains(PutS3Object.PROXY_HOST));
assertTrue(pd.contains(PutS3Object.PROXY_HOST_PORT));