Repository: nifi Updated Branches: refs/heads/master b75af5b34 -> 2a90bd501
NIFI-797 Corrected URL for PutS3 to include correct protocol and endpoint by region. - corrected a missed 'final' on org.apache.nifi.processors.aws.AbstractAWSProcessor.relationships - added protected method org.apache.nifi.processors.aws.AbstractAWSProcessor.getRegion() - added protected method org.apache.nifi.processors.aws.s3.AbstractS3Processor.getUrlForObject(String, String) - explicitly set AWS client protocol to HTTPS, and created a static final field with comments if other protocols may be considered - added a static final field for the UserAgent Reviewed by Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2a90bd50 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2a90bd50 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2a90bd50 Branch: refs/heads/master Commit: 2a90bd501b5b3aad11ee8b92f276302741e5a91e Parents: b75af5b Author: Tony Kurc <[email protected]> Authored: Sun Nov 8 19:51:19 2015 -0500 Committer: Tony Kurc <[email protected]> Committed: Tue Nov 10 06:37:56 2015 -0500 ---------------------------------------------------------------------- .../processors/aws/AbstractAWSProcessor.java | 22 ++++++++++++++++---- .../processors/aws/s3/AbstractS3Processor.java | 12 +++++++++++ .../nifi/processors/aws/s3/PutS3Object.java | 2 +- 3 files changed, 31 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/2a90bd50/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index e2ae31e..165a759 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -40,6 +40,7 @@ import org.apache.nifi.processor.util.StandardValidators; import com.amazonaws.AmazonWebServiceClient; import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AnonymousAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials; @@ -54,7 +55,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") .description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build(); - public static Set<Relationship> relationships = Collections.unmodifiableSet( + public static final Set<Relationship> relationships = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); public static final PropertyDescriptor CREDENTAILS_FILE = new PropertyDescriptor.Builder() @@ -92,6 +93,11 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl .build(); private volatile ClientType client; + private volatile Region region; + + // If protocol is changed to be a property, ensure other uses are also changed + protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS; + protected static final String DEFAULT_USER_AGENT = "NiFi"; private static AllowableValue createAllowableValue(final Regions regions) { return new AllowableValue(regions.getName(), regions.getName(), regions.getName()); @@ -133,8 +139,9 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl final ClientConfiguration config = new ClientConfiguration(); config.setMaxConnections(context.getMaxConcurrentTasks()); config.setMaxErrorRetry(0); - config.setUserAgent("NiFi"); - + config.setUserAgent(DEFAULT_USER_AGENT); + // If this is changed to be a property, ensure other uses are also changed + config.setProtocol(DEFAULT_PROTOCOL); final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); config.setConnectionTimeout(commsTimeout); config.setSocketTimeout(commsTimeout); @@ -151,7 +158,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl if (getSupportedPropertyDescriptors().contains(REGION)) { final String region = context.getProperty(REGION).getValue(); if (region != null) { - client.setRegion(Region.getRegion(Regions.fromName(region))); + this.region = Region.getRegion(Regions.fromName(region)); + client.setRegion(this.region); + } else{ + this.region = null; } } } @@ -162,6 +172,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl return client; } + protected Region getRegion() { + return region; + } + protected AWSCredentials getCredentials(final ProcessContext context) { final String accessKey = context.getProperty(ACCESS_KEY).getValue(); final String secretKey = context.getProperty(SECRET_KEY).getValue(); http://git-wip-us.apache.org/repos/asf/nifi/blob/2a90bd50/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java index 76880ef..3803158 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -28,6 +28,7 @@ import org.apache.nifi.processors.aws.AbstractAWSProcessor; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.regions.Region; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.CanonicalGrantee; @@ -134,6 +135,17 @@ public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3C return grantees; } + protected String getUrlForObject(final String bucket, final String key) { + Region region = getRegion(); + + if (region == null) { + return DEFAULT_PROTOCOL.toString() + "://s3.amazonaws.com/" + bucket + "/" + key; + } else { + final String endpoint = region.getServiceEndpoint("s3"); + return DEFAULT_PROTOCOL.toString() + "://" + endpoint + "/" + bucket + "/" + key; + } + } + protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) { final AccessControlList acl = new AccessControlList(); http://git-wip-us.apache.org/repos/asf/nifi/blob/2a90bd50/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index c7212f5..d2409cd 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -170,7 +170,7 @@ public class PutS3Object extends AbstractS3Processor { } session.transfer(flowFile, REL_SUCCESS); - final String url = "http://" + bucket + ".s3.amazonaws.com/" + key; + final String url = getUrlForObject(bucket, key); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, url, millis);
