NIFI-25: Updated AWS procs to fix problems noted by code review

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0211d0d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0211d0d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0211d0d7

Branch: refs/heads/NIFI-271
Commit: 0211d0d71561fb63df4a06cd7b1a3b430b7a3e6c
Parents: 93a1210
Author: Mark Payne <[email protected]>
Authored: Sat Apr 11 14:34:11 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Sat Apr 11 14:34:11 2015 -0400

----------------------------------------------------------------------
 .../nifi-aws-bundle/nifi-aws-nar/pom.xml        |   5 +-
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml |   2 +-
 .../processors/aws/AbstractAWSProcessor.java    |  16 ++
 .../processors/aws/s3/AbstractS3Processor.java  |  16 ++
 .../nifi/processors/aws/s3/FetchS3Object.java   | 157 ++++++++++++++++++
 .../nifi/processors/aws/s3/GetS3Object.java     | 165 -------------------
 .../nifi/processors/aws/s3/PutS3Object.java     |  36 +++-
 .../aws/sns/AbstractSNSProcessor.java           |  16 ++
 .../apache/nifi/processors/aws/sns/PutSNS.java  |  28 +++-
 .../aws/sqs/AbstractSQSProcessor.java           |  16 ++
 .../nifi/processors/aws/sqs/DeleteSQS.java      |  26 ++-
 .../apache/nifi/processors/aws/sqs/GetSQS.java  |  31 +++-
 .../apache/nifi/processors/aws/sqs/PutSQS.java  |  30 +++-
 .../org.apache.nifi.processor.Processor         |   2 +-
 .../processors/aws/s3/TestFetchS3Object.java    |  44 +++++
 .../nifi/processors/aws/s3/TestGetS3Object.java |  47 ------
 nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml   |   2 +-
 17 files changed, 401 insertions(+), 238 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
index 4c34715..3e6f450 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-nar/pom.xml
@@ -19,18 +19,17 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-aws-bundle</artifactId>
-        <version>0.0.2-incubating-SNAPSHOT</version>
+        <version>0.1.0-incubating-SNAPSHOT</version>
     </parent>
 
     <artifactId>nifi-aws-nar</artifactId>
-    <version>0.0.2-incubating-SNAPSHOT</version>
     <packaging>nar</packaging>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-aws-processors</artifactId>
-            <version>0.0.2-incubating-SNAPSHOT</version>
+            <version>0.1.0-incubating-SNAPSHOT</version>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 2cb1302..2270773 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-aws-bundle</artifactId>
-        <version>0.0.2-incubating-SNAPSHOT</version>
+        <version>0.1.0-incubating-SNAPSHOT</version>
     </parent>
 
     <artifactId>nifi-aws-processors</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index dde0d1d..11c6a9d 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 4849357..624015b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws.s3;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
new file mode 100644
index 0000000..63c8346
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+
+
+@SupportsBatching
+@SeeAlso({PutS3Object.class})
+@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
+@CapabilityDescription("Retrieves the contents of an S3 Object and writes it 
to the content of a FlowFile")
+@WritesAttributes({
+       @WritesAttribute(attribute="s3.bucket", description="The name of the S3 
bucket"),
+       @WritesAttribute(attribute="path", description="The path of the file"),
+       @WritesAttribute(attribute="absolute.path", description="The path of 
the file"),
+       @WritesAttribute(attribute="filename", description="The name of the 
file"),
+       @WritesAttribute(attribute="hash.value", description="The MD5 sum of 
the file"),
+       @WritesAttribute(attribute="hash.algorithm", description="MD5"),
+       @WritesAttribute(attribute="mime.type", description="If S3 provides the 
content type/MIME type, this attribute will hold that file"),
+       @WritesAttribute(attribute="s3.etag", description="The ETag that can be 
used to see if the file has changed"),
+       @WritesAttribute(attribute="s3.expirationTime", description="If the 
file has an expiration date, this attribute will be set, containing the 
milliseconds since epoch in UTC time"),
+       @WritesAttribute(attribute="s3.expirationTimeRuleId", description="The 
ID of the rule that dictates this object's expiration time"),
+       @WritesAttribute(attribute="s3.version", description="The version of 
the S3 object"),
+})
+public class FetchS3Object extends AbstractS3Processor {
+
+       public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
+        .name("Version")
+        .description("The Version of the Object to download")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(false)
+        .build();
+    
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTAILS_FILE, TIMEOUT, VERSION_ID) );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+    
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if ( flowFile == null ) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+        
+        final AmazonS3 client = getClient();
+        final GetObjectRequest request;
+        if ( versionId == null ) {
+            request = new GetObjectRequest(bucket, key);
+        } else {
+            request = new GetObjectRequest(bucket, key, versionId);
+        }
+
+        final Map<String, String> attributes = new HashMap<>();
+        try (final S3Object s3Object = client.getObject(request)) {
+            flowFile = session.importFrom(s3Object.getObjectContent(), 
flowFile);
+            attributes.put("s3.bucket", s3Object.getBucketName());
+            
+            final ObjectMetadata metadata = s3Object.getObjectMetadata();
+            if ( metadata.getContentDisposition() != null ) {
+                final String fullyQualified = metadata.getContentDisposition();
+                final int lastSlash = fullyQualified.lastIndexOf("/");
+                if ( lastSlash > -1 && lastSlash < fullyQualified.length() - 1 
) {
+                    attributes.put(CoreAttributes.PATH.key(), 
fullyQualified.substring(0, lastSlash));
+                    attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), 
fullyQualified);
+                    attributes.put(CoreAttributes.FILENAME.key(), 
fullyQualified.substring(lastSlash + 1));
+                } else {
+                    attributes.put(CoreAttributes.FILENAME.key(), 
metadata.getContentDisposition());
+                }
+            }
+            if (metadata.getContentMD5() != null ) {
+                attributes.put("hash.value", metadata.getContentMD5());
+                attributes.put("hash.algorithm", "MD5");
+            }
+            if ( metadata.getContentType() != null ) {
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
metadata.getContentType());
+            }
+            if ( metadata.getETag() != null ) {
+                attributes.put("s3.etag", metadata.getETag());
+            }
+            if ( metadata.getExpirationTime() != null ) {
+                attributes.put("s3.expirationTime", 
String.valueOf(metadata.getExpirationTime().getTime()));
+            }
+            if ( metadata.getExpirationTimeRuleId() != null ) {
+                attributes.put("s3.expirationTimeRuleId", 
metadata.getExpirationTimeRuleId());
+            }
+            if ( metadata.getUserMetadata() != null ) {
+                attributes.putAll(metadata.getUserMetadata());
+            }
+            if ( metadata.getVersionId() != null ) {
+                attributes.put("s3.version", metadata.getVersionId());
+            }
+        } catch (final IOException | AmazonClientException ioe) {
+            getLogger().error("Failed to retrieve S3 Object for {}; routing to 
failure", new Object[] {flowFile, ioe});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        if ( !attributes.isEmpty() ) {
+            flowFile = session.putAllAttributes(flowFile, attributes);
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully retrieved S3 Object for {} in {} 
millis; routing to success", new Object[] {flowFile, transferMillis});
+        session.getProvenanceReporter().receive(flowFile, "http://"; + bucket + 
".amazonaws.com/" + key, transferMillis);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3Object.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3Object.java
deleted file mode 100644
index 6ba1bbf..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/GetS3Object.java
+++ /dev/null
@@ -1,165 +0,0 @@
-package org.apache.nifi.processors.aws.s3;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3Object;
-
-
-@Tags({"Amazon", "S3", "AWS", "Get"})
-@CapabilityDescription("Retrieves the contents of an S3 Object and writes it 
to the content of a FlowFile")
-public class GetS3Object extends AbstractS3Processor {
-
-    public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
-        .name("Version")
-        .description("The Version of the Object to download")
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(true)
-        .required(false)
-        .build();
-    
-    public static final PropertyDescriptor BYTE_RANGE_START = new 
PropertyDescriptor.Builder()
-        .name("First Byte Index")
-        .description("The 0-based index of the first byte to download. If 
specified, the first N bytes will be skipped, where N is the value of this 
property. If this value is greater than the size of the object, the FlowFile 
will be routed to failure.")
-        .required(false)
-        .expressionLanguageSupported(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .build();
-    public static final PropertyDescriptor BYTE_RANGE_END = new 
PropertyDescriptor.Builder()
-        .name("Last Byte Index")
-        .description("The 0-based index of the last byte to download. If 
specified, last N bytes will be skipped, where N is the size of the object 
minus the value of this property. If the value is greater than the size of the 
object, the content will be downloaded to the end of the object.")
-        .required(false)
-        .expressionLanguageSupported(true)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .build();
-    
-    
-    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTAILS_FILE, TIMEOUT, VERSION_ID,
-                    BYTE_RANGE_START, BYTE_RANGE_END) );
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-    
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        FlowFile flowFile = session.get();
-        if ( flowFile == null ) {
-            return;
-        }
-
-        final long startNanos = System.nanoTime();
-        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
-        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
-        
-        final AmazonS3 client = getClient();
-        final GetObjectRequest request;
-        if ( versionId == null ) {
-            request = new GetObjectRequest(bucket, key);
-        } else {
-            request = new GetObjectRequest(bucket, key, versionId);
-        }
-
-        final Long byteRangeStart;
-        final Long byteRangeEnd;
-        try {
-            final PropertyValue startVal = 
context.getProperty(BYTE_RANGE_START).evaluateAttributeExpressions(flowFile);
-            byteRangeStart = startVal.isSet() ? startVal.asLong() : 0L;
-            
-            final PropertyValue endVal = 
context.getProperty(BYTE_RANGE_END).evaluateAttributeExpressions(flowFile);
-            byteRangeEnd = endVal.isSet() ? endVal.asLong() : Long.MAX_VALUE;
-        } catch (final NumberFormatException nfe) {
-            getLogger().error("Failed to determine byte range for download for 
{} due to {}", new Object[] {flowFile, nfe});
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
-        
-        if ( byteRangeStart != null && byteRangeEnd != null ) {
-            if ( byteRangeEnd.longValue() < byteRangeStart.longValue() ) {
-                getLogger().error("Failed to download object from S3 for {} 
because Start Byte Range is {} and End Byte Range is {}, which is less", new 
Object[] {flowFile, byteRangeStart, byteRangeEnd});
-                session.transfer(flowFile, REL_FAILURE);
-                return;
-            }
-            
-            request.setRange(byteRangeStart.longValue(), 
byteRangeEnd.longValue());
-        }
-        
-        final Map<String, String> attributes = new HashMap<>();
-        try (final S3Object s3Object = client.getObject(request)) {
-            flowFile = session.importFrom(s3Object.getObjectContent(), 
flowFile);
-            attributes.put("s3.bucket", s3Object.getBucketName());
-            
-            final ObjectMetadata metadata = s3Object.getObjectMetadata();
-            if ( metadata.getContentDisposition() != null ) {
-                final String fullyQualified = metadata.getContentDisposition();
-                final int lastSlash = fullyQualified.lastIndexOf("/");
-                if ( lastSlash > -1 && lastSlash < fullyQualified.length() - 1 
) {
-                    attributes.put(CoreAttributes.PATH.key(), 
fullyQualified.substring(0, lastSlash));
-                    attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), 
fullyQualified);
-                    attributes.put(CoreAttributes.FILENAME.key(), 
fullyQualified.substring(lastSlash + 1));
-                } else {
-                    attributes.put(CoreAttributes.FILENAME.key(), 
metadata.getContentDisposition());
-                }
-            }
-            if (metadata.getContentMD5() != null ) {
-                attributes.put("hash.value", metadata.getContentMD5());
-                attributes.put("hash.algorithm", "MD5");
-            }
-            if ( metadata.getContentType() != null ) {
-                attributes.put(CoreAttributes.MIME_TYPE.key(), 
metadata.getContentType());
-            }
-            if ( metadata.getETag() != null ) {
-                attributes.put("s3.etag", metadata.getETag());
-            }
-            if ( metadata.getExpirationTime() != null ) {
-                attributes.put("s3.expirationTime", 
String.valueOf(metadata.getExpirationTime().getTime()));
-            }
-            if ( metadata.getExpirationTimeRuleId() != null ) {
-                attributes.put("s3.expirationTimeRuleId", 
metadata.getExpirationTimeRuleId());
-            }
-            if ( metadata.getUserMetadata() != null ) {
-                attributes.putAll(metadata.getUserMetadata());
-            }
-            if ( metadata.getVersionId() != null ) {
-                attributes.put("s3.version", metadata.getVersionId());
-            }
-        } catch (final IOException | AmazonClientException ioe) {
-            getLogger().error("Failed to retrieve S3 Object for {}; routing to 
failure", new Object[] {flowFile, ioe});
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
-
-        if ( !attributes.isEmpty() ) {
-            flowFile = session.putAllAttributes(flowFile, attributes);
-        }
-
-        session.transfer(flowFile, REL_SUCCESS);
-        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully retrieved S3 Object for {} in {} 
millis; routing to success", new Object[] {flowFile, transferMillis});
-        session.getProvenanceReporter().receive(flowFile, "http://"; + bucket + 
".amazonaws.com/" + key, transferMillis);
-    }
-
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 1fe5db1..9a4fc5b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws.s3;
 
 import java.io.BufferedInputStream;
@@ -11,7 +27,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -30,11 +52,21 @@ import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.StorageClass;
 
-
+@SupportsBatching
+@SeeAlso({FetchS3Object.class})
 @Tags({"Amazon", "S3", "AWS", "Archive", "Put"})
 @CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket")
+@DynamicProperty(name="The name of a User-Defined Metadata field to add to the 
S3 Object", 
+       value="The value of a User-Defined Metadata field to add to the S3 
Object", 
+       description="Allows user-defined metadata to be added to the S3 object 
as key/value pairs",
+       supportsExpressionLanguage=true)
+@ReadsAttribute(attribute="filename", description="Uses the FlowFile's 
filename as the filename for the S3 object")
+@WritesAttributes({
+       @WritesAttribute(attribute="s3.version", description="The version of 
the S3 Object that was put to S3"),
+       @WritesAttribute(attribute="s3.etag", description="The ETag of the S3 
Object"),
+       @WritesAttribute(attribute="s3.expiration", description="A 
human-readable form of the expiration date of the S3 object, if one is set")
+})
 public class PutS3Object extends AbstractS3Processor {
-
     public static final PropertyDescriptor EXPIRATION_RULE_ID = new 
PropertyDescriptor.Builder()
         .name("Expiration Time Rule")
         .required(false)

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
index d2404f3..5447169 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws.sns;
 
 import org.apache.nifi.components.AllowableValue;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index bc690eb..1de3251 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws.sns;
 
 import java.io.ByteArrayOutputStream;
@@ -7,24 +23,24 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.sqs.GetSQS;
+import org.apache.nifi.processors.aws.sqs.PutSQS;
 
 import com.amazonaws.services.sns.AmazonSNSClient;
 import com.amazonaws.services.sns.model.MessageAttributeValue;
 import com.amazonaws.services.sns.model.PublishRequest;
 
-
-// TODO: Allow user to choose 'content strategy'
-//       1. Content from message body or attributes? If attributes, allow new 
property for configuring EL expression. Otherwise, no property.
-//       2. Same content to all subscribers or different content per 
subscription type? 
-//              If same, just use single property.
-//              If different, must use Attribute values for each and have a 
separate property for each type of subscription (HTTP, HTTPS, E-mail, SMS, etc.)
+@SupportsBatching
+@SeeAlso({GetSQS.class, PutSQS.class})
 @Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"})
 @CapabilityDescription("Sends the content of a FlowFile as a notification to 
the Amazon Simple Notification Service")
 public class PutSNS extends AbstractSNSProcessor {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
index 72eec4c..2ef749f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws.sqs;
 
 import org.apache.nifi.components.PropertyDescriptor;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index 3659a50..2416044 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws.sqs;
 
 import java.util.ArrayList;
@@ -5,7 +21,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -17,8 +35,9 @@ import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
 import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
 
-
-@Tags({"Amazon", "SQS", "Queue", "Delete"})
+@SupportsBatching
+@SeeAlso({GetSQS.class, PutSQS.class})
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"})
 @CapabilityDescription("Deletes a message from an Amazon Simple Queuing 
Service Queue")
 public class DeleteSQS extends AbstractSQSProcessor {
     public static final PropertyDescriptor RECEIPT_HANDLE = new 
PropertyDescriptor.Builder()
@@ -41,9 +60,6 @@ public class DeleteSQS extends AbstractSQSProcessor {
     
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        // TODO: Process batch of FlowFiles. To do this, we have to first poll 
the first
-        // FlowFile and then use session.get(FlowFileFilter) to get up to 
BATCH_SIZE - 1
-        // more results that have the same Queue URL.
         List<FlowFile> flowFiles = session.get(1);
         if ( flowFiles.isEmpty() ) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index 1af3780..6c0257b 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws.sqs;
 
 import java.io.IOException;
@@ -12,7 +28,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -30,10 +50,17 @@ import 
com.amazonaws.services.sqs.model.MessageAttributeValue;
 import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
 import com.amazonaws.services.sqs.model.ReceiveMessageResult;
 
-@Tags({ "Amazon", "SQS", "Queue", "Get", "Fetch", "Poll"})
+@SupportsBatching
+@Tags({ "Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"})
+@SeeAlso({PutSQS.class, DeleteSQS.class})
 @CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service 
Queue")
+@WritesAttributes({
+       @WritesAttribute(attribute="hash.value", description="The MD5 sum of 
the message"),
+       @WritesAttribute(attribute="hash.algorithm", description="MD5"),
+       @WritesAttribute(attribute="sqs.message.id", description="The unique 
identifier of the SQS message"),
+       @WritesAttribute(attribute="sqs.receipt.handle", description="The SQS 
Receipt Handle that is to be used to delete the message from the queue")
+})
 public class GetSQS extends AbstractSQSProcessor {
-
     public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
         .name("Character Set")
         .description("The Character Set that should be used to encode the 
textual content of the SQS message")

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
index cac4d73..81268fe 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.nifi.processors.aws.sqs;
 
 import java.io.ByteArrayOutputStream;
@@ -11,7 +27,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -26,8 +45,13 @@ import 
com.amazonaws.services.sqs.model.SendMessageBatchRequest;
 import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
 
 
-@Tags({"Amazon", "SQS", "Queue", "Put", "Publish"})
+@SupportsBatching
+@Tags({"Amazon", "AWS", "SQS", "Queue", "Put", "Publish"})
+@SeeAlso({GetSQS.class, DeleteSQS.class})
 @CapabilityDescription("Publishes a message to an Amazon Simple Queuing 
Service Queue")
+@DynamicProperty(name="The name of a Message Attribute to add to the message", 
value="The value of the Message Attribute", 
+       description="Allows the user to add key/value pairs as Message 
Attributes by adding a property whose name will become the name of "
+                       + "the Message Attribute and value will become the 
value of the Message Attribute", supportsExpressionLanguage=true)
 public class PutSQS extends AbstractSQSProcessor {
 
     public static final PropertyDescriptor DELAY = new 
PropertyDescriptor.Builder()
@@ -71,10 +95,6 @@ public class PutSQS extends AbstractSQSProcessor {
     
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        // TODO: Process batch of FlowFiles. To do this, we have to first poll 
the first
-        // FlowFile and then use session.get(FlowFileFilter) to get up to 
BATCH_SIZE - 1
-        // more results that have the same Queue URL.
-        
         FlowFile flowFile = session.get();
         if ( flowFile == null ) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 32a395a..4f2405c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-org.apache.nifi.processors.aws.s3.GetS3Object
+org.apache.nifi.processors.aws.s3.FetchS3Object
 org.apache.nifi.processors.aws.s3.PutS3Object
 org.apache.nifi.processors.aws.sns.PutSNS
 org.apache.nifi.processors.aws.sqs.GetSQS

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
new file mode 100644
index 0000000..40f9515
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -0,0 +1,44 @@
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("For local testing only - interacts with S3 so the credentials file 
must be configured and all necessary buckets created")
+public class TestFetchS3Object {
+    private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
+    
+    @Test
+    public void testGet() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
FetchS3Object());
+        runner.setProperty(FetchS3Object.BUCKET, 
"anonymous-test-bucket-00000000");
+        runner.setProperty(FetchS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
+        runner.setProperty(FetchS3Object.KEY, "folder/1.txt");
+        
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("start", "0");
+        
+        runner.enqueue(new byte[0], attrs);
+        runner.run(1);
+        
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
+        final List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_SUCCESS);
+        final MockFlowFile out = ffs.iterator().next();
+        
+        final byte[] expectedBytes = 
Files.readAllBytes(Paths.get("src/test/resources/hello.txt"));
+        out.assertContentEquals(new String(expectedBytes));
+        for ( final Map.Entry<String, String> entry : 
out.getAttributes().entrySet() ) {
+            System.out.println(entry.getKey() + " : " + entry.getValue());
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3Object.java
 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3Object.java
deleted file mode 100644
index 391c8b4..0000000
--- 
a/nifi/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestGetS3Object.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.nifi.processors.aws.s3;
-
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Ignore;
-import org.junit.Test;
-
-@Ignore("For local testing only - interacts with S3 so the credentials file 
must be configured and all necessary buckets created")
-public class TestGetS3Object {
-    private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
-    
-    @Test
-    public void testGet() throws IOException {
-        final TestRunner runner = TestRunners.newTestRunner(new GetS3Object());
-        runner.setProperty(GetS3Object.BUCKET, 
"anonymous-test-bucket-00000000");
-        runner.setProperty(GetS3Object.CREDENTAILS_FILE, CREDENTIALS_FILE);
-        runner.setProperty(GetS3Object.KEY, "folder/1.txt");
-        
-        runner.setProperty(GetS3Object.BYTE_RANGE_START, "${start}");
-        runner.setProperty(GetS3Object.BYTE_RANGE_END, 
String.valueOf(Long.MAX_VALUE));
-        
-        final Map<String, String> attrs = new HashMap<>();
-        attrs.put("start", "0");
-        
-        runner.enqueue(new byte[0], attrs);
-        runner.run(1);
-        
-        runner.assertAllFlowFilesTransferred(GetS3Object.REL_SUCCESS, 1);
-        final List<MockFlowFile> ffs = 
runner.getFlowFilesForRelationship(GetS3Object.REL_SUCCESS);
-        final MockFlowFile out = ffs.iterator().next();
-        
-        final byte[] expectedBytes = 
Files.readAllBytes(Paths.get("src/test/resources/hello.txt"));
-        out.assertContentEquals(new String(expectedBytes));
-        for ( final Map.Entry<String, String> entry : 
out.getAttributes().entrySet() ) {
-            System.out.println(entry.getKey() + " : " + entry.getValue());
-        }
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/0211d0d7/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml 
b/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
index 81a54e4..117d7dd 100644
--- a/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-aws-bundle/pom.xml
@@ -19,7 +19,7 @@
     <parent>
         <groupId>org.apache.nifi</groupId>
         <artifactId>nifi-nar-bundles</artifactId>
-        <version>0.0.2-incubating-SNAPSHOT</version>
+        <version>0.1.0-incubating-SNAPSHOT</version>
     </parent>
 
     <artifactId>nifi-aws-bundle</artifactId>

Reply via email to