Repository: nifi Updated Branches: refs/heads/master fbaf1c36d -> 8f92ac993
NIFI-1225: Changes for ENDPOINT_OVERRIDE and SSL Context Service from pr for NIFI-1107 Reviewed by Mark Payne <[email protected]> and Tony Kurc ([email protected]) Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/55352ae5 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/55352ae5 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/55352ae5 Branch: refs/heads/master Commit: 55352ae5b1252201b2a3058c45392b7fb0ec90df Parents: fbaf1c3 Author: Joe Skora <[email protected]> Authored: Tue Dec 1 20:04:19 2015 -0500 Committer: Tony Kurc <[email protected]> Committed: Tue Dec 1 20:32:00 2015 -0500 ---------------------------------------------------------------------- .../nifi-aws-bundle/nifi-aws-processors/pom.xml | 4 ++ .../processors/aws/AbstractAWSProcessor.java | 37 ++++++++++++++- .../nifi/processors/aws/s3/DeleteS3Object.java | 5 +- .../nifi/processors/aws/s3/FetchS3Object.java | 4 +- .../nifi/processors/aws/s3/PutS3Object.java | 48 ++++++++++++++++---- .../nifi/processors/aws/s3/TestPutS3Object.java | 30 ++++++++++++ 6 files changed, 114 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/55352ae5/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index 1e16614..a86af8a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -50,6 +50,10 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/nifi/blob/55352ae5/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index 61c3c85..8c1919e 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -27,6 +27,9 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLContext; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; @@ -37,6 +40,7 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; import com.amazonaws.AmazonWebServiceClient; import com.amazonaws.ClientConfiguration; @@ -45,6 +49,7 @@ import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AnonymousAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; @@ -92,6 +97,22 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl .defaultValue("30 secs") .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder() + .name("Endpoint Override URL") + .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " + + "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " + + "the selected endpoint URL, allowing use with other S3-compatible endpoints.") + .required(false) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + private volatile ClientType client; private volatile Region region; @@ -146,6 +167,13 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl config.setConnectionTimeout(commsTimeout); config.setSocketTimeout(commsTimeout); + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null) { + final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE); + SdkTLSSocketFactory sdkTLSSocketFactory = new SdkTLSSocketFactory(sslContext, null); + config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory); + } + return config; } @@ -160,10 +188,17 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl if (region != null) { this.region = Region.getRegion(Regions.fromName(region)); client.setRegion(this.region); - } else{ + } else { this.region = null; } } + + // if the endpoint override has been configured, set the endpoint. + // (per Amazon docs this should only be configured at client creation) + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()); + if (!urlstr.isEmpty()) { + this.client.setEndpoint(urlstr); + } } protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config); http://git-wip-us.apache.org/repos/asf/nifi/blob/55352ae5/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index 6b078d1..9dc3595 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -40,7 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators; @SupportsBatching -@SeeAlso({PutS3Object.class}) +@SeeAlso({PutS3Object.class, FetchS3Object.class}) @Tags({"Amazon", "S3", "AWS", "Archive", "Delete"}) @InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " + @@ -57,7 +57,8 @@ public class DeleteS3Object extends AbstractS3Processor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER)); + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, + SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/55352ae5/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index d7ec88a..9eaf019 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -46,7 +46,7 @@ import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3Object; @SupportsBatching -@SeeAlso({PutS3Object.class}) +@SeeAlso({PutS3Object.class, DeleteS3Object.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) @CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") @@ -73,7 +73,7 @@ public class FetchS3Object extends AbstractS3Processor { .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID)); + Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/55352ae5/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index b03d85d..3f88a74 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.aws.s3; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -27,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -47,7 +49,7 @@ import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PutObjectRequest; @@ -55,7 +57,7 @@ import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.StorageClass; @SupportsBatching -@SeeAlso({FetchS3Object.class}) +@SeeAlso({FetchS3Object.class, DeleteS3Object.class}) @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") @@ -63,9 +65,13 @@ import com.amazonaws.services.s3.model.StorageClass; description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true) @ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object") @WritesAttributes({ + @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"), + @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"), @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), - @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set") + @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set"), + @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), + @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of the S3 object, if any was set") }) public class PutS3Object extends AbstractS3Processor { @@ -85,7 +91,17 @@ public class PutS3Object extends AbstractS3Processor { public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER)); + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); + + final static String S3_BUCKET_KEY = "s3.bucket"; + final static String S3_OBJECT_KEY = "s3.key"; + final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId"; + final static String S3_VERSION_ATTR_KEY = "s3.version"; + final static String S3_ETAG_ATTR_KEY = "s3.etag"; + final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration"; + final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass"; + final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class"; + final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata"; @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -114,9 +130,12 @@ public class PutS3Object extends AbstractS3Processor { final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final AmazonS3 s3 = getClient(); + final AmazonS3Client s3 = getClient(); final FlowFile ff = flowFile; final Map<String, String> attributes = new HashMap<>(); + attributes.put(S3_BUCKET_KEY, bucket); + attributes.put(S3_OBJECT_KEY, key); + try { session.read(flowFile, new InputStreamCallback() { @Override @@ -152,14 +171,25 @@ public class PutS3Object extends AbstractS3Processor { final PutObjectResult result = s3.putObject(request); if (result.getVersionId() != null) { - attributes.put("s3.version", result.getVersionId()); + attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId()); } - attributes.put("s3.etag", result.getETag()); + attributes.put(S3_ETAG_ATTR_KEY, result.getETag()); final Date expiration = result.getExpirationTime(); if (expiration != null) { - attributes.put("s3.expiration", expiration.toString()); + attributes.put(S3_EXPIRATION_ATTR_KEY, expiration.toString()); + } + if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) { + attributes.put(S3_STORAGECLASS_ATTR_KEY, + result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString()); + } + if (userMetadata.size() > 0) { + List<String> pairs = new ArrayList<String>(); + for (String userKey : userMetadata.keySet()) { + pairs.add(userKey + "=" + userMetadata.get(userKey)); + } + attributes.put(S3_USERMETA_ATTR_KEY, StringUtils.join(pairs, ", ")); } } } @@ -170,7 +200,7 @@ public class PutS3Object extends AbstractS3Processor { } session.transfer(flowFile, REL_SUCCESS); - final String url = getUrlForObject(bucket, key); + final String url = s3.getResourceUrl(bucket, key); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, url, millis); http://git-wip-us.apache.org/repos/asf/nifi/blob/55352ae5/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java index 1755e1d..82e51ce 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestPutS3Object.java @@ -18,8 +18,11 @@ package org.apache.nifi.processors.aws.s3; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -52,6 +55,33 @@ public class TestPutS3Object extends AbstractS3Test { } @Test + public void testMetaData() throws IOException { + PutS3Object processor = new PutS3Object(); + final TestRunner runner = TestRunners.newTestRunner(processor); + + runner.setProperty(PutS3Object.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + PropertyDescriptor prop1 = processor.getSupportedDynamicPropertyDescriptor("TEST-PROP-1"); + runner.setProperty(prop1, "TESTING-1-2-3"); + PropertyDescriptor prop2 = processor.getSupportedDynamicPropertyDescriptor("TEST-PROP-2"); + runner.setProperty(prop2, "TESTING-4-5-6"); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "meta.txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + + runner.run(); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); + List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutS3Object.REL_SUCCESS); + MockFlowFile ff1 = flowFiles.get(0); + for (Map.Entry attrib : ff1.getAttributes().entrySet()) { + System.out.println(attrib.getKey() + " = " + attrib.getValue()); + } + } + + @Test public void testPutInFolder() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
