http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java 
b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java
index f8e74bc..fbdea94 100644
--- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java
+++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java
@@ -1,126 +1,126 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.junit.Test;
-
-public class TestMockProcessContext {
-
-    @Test
-    public void testRemoveProperty() {
-        final DummyProcessor proc = new DummyProcessor();
-        final MockProcessContext context = new MockProcessContext(proc);
-        context.setProperty(DummyProcessor.REQUIRED_PROP, "req-value");
-        context.setProperty(DummyProcessor.OPTIONAL_PROP, "opt-value");
-        context.setProperty(DummyProcessor.DEFAULTED_PROP, "custom-value");
-
-        assertEquals(1, proc.getUpdateCount(DummyProcessor.REQUIRED_PROP));
-        assertEquals(1, proc.getUpdateCount(DummyProcessor.OPTIONAL_PROP));
-        assertEquals(1, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
-
-        assertTrue(context.removeProperty(DummyProcessor.OPTIONAL_PROP));
-        
assertNull(context.getProperty(DummyProcessor.OPTIONAL_PROP).getValue());
-        assertFalse(context.removeProperty(DummyProcessor.OPTIONAL_PROP));
-        assertEquals(2, proc.getUpdateCount(DummyProcessor.OPTIONAL_PROP));
-
-        assertTrue(context.removeProperty(DummyProcessor.REQUIRED_PROP));
-        
assertNull(context.getProperty(DummyProcessor.REQUIRED_PROP).getValue());
-        assertFalse(context.removeProperty(DummyProcessor.REQUIRED_PROP));
-        assertEquals(2, proc.getUpdateCount(DummyProcessor.REQUIRED_PROP));
-
-        assertTrue(context.removeProperty(DummyProcessor.DEFAULTED_PROP));
-        assertEquals("default-value", 
context.getProperty(DummyProcessor.DEFAULTED_PROP).getValue());
-        assertFalse(context.removeProperty(DummyProcessor.DEFAULTED_PROP));
-        assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
-
-        // Since value is already the default, this shouldn't trigger 
onPropertyModified to be called.
-        context.setProperty(DummyProcessor.DEFAULTED_PROP, 
DummyProcessor.DEFAULTED_PROP.getDefaultValue());
-        assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
-
-        assertEquals("default-value", 
context.getProperty(DummyProcessor.DEFAULTED_PROP).getValue());
-        assertTrue(context.removeProperty(DummyProcessor.DEFAULTED_PROP));
-
-        // since we are calling remove on a property that has a default value, 
this shouldn't
-        // trigger the onPropertyModified method to be called.
-        assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
-    }
-
-    private static class DummyProcessor extends AbstractProcessor {
-        static final PropertyDescriptor REQUIRED_PROP = new 
PropertyDescriptor.Builder()
-            .name("required")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-        static final PropertyDescriptor DEFAULTED_PROP = new 
PropertyDescriptor.Builder()
-            .name("defaulted")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("default-value")
-            .build();
-        static final PropertyDescriptor OPTIONAL_PROP = new 
PropertyDescriptor.Builder()
-            .name("optional")
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-
-        private final Map<PropertyDescriptor, Integer> propertyModifiedCount = 
new HashMap<>();
-
-        @Override
-        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-            final List<PropertyDescriptor> properties = new ArrayList<>();
-            properties.add(REQUIRED_PROP);
-            properties.add(DEFAULTED_PROP);
-            properties.add(OPTIONAL_PROP);
-            return properties;
-        }
-
-        @Override
-        public void onPropertyModified(final PropertyDescriptor descriptor, 
final String oldValue, final String newValue) {
-            Integer updateCount = propertyModifiedCount.get(descriptor);
-            if (updateCount == null) {
-                updateCount = 0;
-            }
-
-            propertyModifiedCount.put(descriptor, updateCount + 1);
-        }
-
-        public int getUpdateCount(final PropertyDescriptor descriptor) {
-            Integer updateCount = propertyModifiedCount.get(descriptor);
-            return (updateCount == null) ? 0 : updateCount;
-        }
-
-        @Override
-        public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.junit.Test;
+
+public class TestMockProcessContext {
+
+    @Test
+    public void testRemoveProperty() {
+        final DummyProcessor proc = new DummyProcessor();
+        final MockProcessContext context = new MockProcessContext(proc);
+        context.setProperty(DummyProcessor.REQUIRED_PROP, "req-value");
+        context.setProperty(DummyProcessor.OPTIONAL_PROP, "opt-value");
+        context.setProperty(DummyProcessor.DEFAULTED_PROP, "custom-value");
+
+        assertEquals(1, proc.getUpdateCount(DummyProcessor.REQUIRED_PROP));
+        assertEquals(1, proc.getUpdateCount(DummyProcessor.OPTIONAL_PROP));
+        assertEquals(1, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
+
+        assertTrue(context.removeProperty(DummyProcessor.OPTIONAL_PROP));
+        
assertNull(context.getProperty(DummyProcessor.OPTIONAL_PROP).getValue());
+        assertFalse(context.removeProperty(DummyProcessor.OPTIONAL_PROP));
+        assertEquals(2, proc.getUpdateCount(DummyProcessor.OPTIONAL_PROP));
+
+        assertTrue(context.removeProperty(DummyProcessor.REQUIRED_PROP));
+        
assertNull(context.getProperty(DummyProcessor.REQUIRED_PROP).getValue());
+        assertFalse(context.removeProperty(DummyProcessor.REQUIRED_PROP));
+        assertEquals(2, proc.getUpdateCount(DummyProcessor.REQUIRED_PROP));
+
+        assertTrue(context.removeProperty(DummyProcessor.DEFAULTED_PROP));
+        assertEquals("default-value", 
context.getProperty(DummyProcessor.DEFAULTED_PROP).getValue());
+        assertFalse(context.removeProperty(DummyProcessor.DEFAULTED_PROP));
+        assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
+
+        // Since value is already the default, this shouldn't trigger 
onPropertyModified to be called.
+        context.setProperty(DummyProcessor.DEFAULTED_PROP, 
DummyProcessor.DEFAULTED_PROP.getDefaultValue());
+        assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
+
+        assertEquals("default-value", 
context.getProperty(DummyProcessor.DEFAULTED_PROP).getValue());
+        assertTrue(context.removeProperty(DummyProcessor.DEFAULTED_PROP));
+
+        // since we are calling remove on a property that has a default value, 
this shouldn't
+        // trigger the onPropertyModified method to be called.
+        assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP));
+    }
+
+    private static class DummyProcessor extends AbstractProcessor {
+        static final PropertyDescriptor REQUIRED_PROP = new 
PropertyDescriptor.Builder()
+            .name("required")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+        static final PropertyDescriptor DEFAULTED_PROP = new 
PropertyDescriptor.Builder()
+            .name("defaulted")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("default-value")
+            .build();
+        static final PropertyDescriptor OPTIONAL_PROP = new 
PropertyDescriptor.Builder()
+            .name("optional")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+        private final Map<PropertyDescriptor, Integer> propertyModifiedCount = 
new HashMap<>();
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            final List<PropertyDescriptor> properties = new ArrayList<>();
+            properties.add(REQUIRED_PROP);
+            properties.add(DEFAULTED_PROP);
+            properties.add(OPTIONAL_PROP);
+            return properties;
+        }
+
+        @Override
+        public void onPropertyModified(final PropertyDescriptor descriptor, 
final String oldValue, final String newValue) {
+            Integer updateCount = propertyModifiedCount.get(descriptor);
+            if (updateCount == null) {
+                updateCount = 0;
+            }
+
+            propertyModifiedCount.put(descriptor, updateCount + 1);
+        }
+
+        public int getUpdateCount(final PropertyDescriptor descriptor) {
+            Integer updateCount = propertyModifiedCount.get(descriptor);
+            return (updateCount == null) ? 0 : updateCount;
+        }
+
+        @Override
+        public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 8c1919e..8d4ef21 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -1,239 +1,239 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.SSLContext;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-
-import com.amazonaws.AmazonWebServiceClient;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.Protocol;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.AnonymousAWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.auth.PropertiesCredentials;
-import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory;
-import com.amazonaws.regions.Region;
-import com.amazonaws.regions.Regions;
-
-public abstract class AbstractAWSProcessor<ClientType extends 
AmazonWebServiceClient> extends AbstractProcessor {
-
-    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
-            .description("FlowFiles are routed to success after being 
successfully copied to Amazon S3").build();
-    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
-            .description("FlowFiles are routed to failure if unable to be 
copied to Amazon S3").build();
-
-    public static final Set<Relationship> relationships = 
Collections.unmodifiableSet(
-            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
-
-    public static final PropertyDescriptor CREDENTIALS_FILE = new 
PropertyDescriptor.Builder()
-            .name("Credentials File")
-            .expressionLanguageSupported(false)
-            .required(false)
-            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor ACCESS_KEY = new 
PropertyDescriptor.Builder()
-            .name("Access Key")
-            .expressionLanguageSupported(false)
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
-    public static final PropertyDescriptor SECRET_KEY = new 
PropertyDescriptor.Builder()
-            .name("Secret Key")
-            .expressionLanguageSupported(false)
-            .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .sensitive(true)
-            .build();
-    public static final PropertyDescriptor REGION = new 
PropertyDescriptor.Builder()
-            .name("Region")
-            .required(true)
-            .allowableValues(getAvailableRegions())
-            
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
-            .build();
-
-    public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
-            .name("Communications Timeout")
-            .required(true)
-            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("30 secs")
-            .build();
-
-    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
-            .name("SSL Context Service")
-            .description("Specifies an optional SSL Context Service that, if 
provided, will be used to create connections")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
-
-    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new 
PropertyDescriptor.Builder()
-            .name("Endpoint Override URL")
-            .description("Endpoint URL to use instead of the AWS default 
including scheme, host, port, and path. " +
-                    "The AWS libraries select an endpoint URL based on the AWS 
region, but this property overrides " +
-                    "the selected endpoint URL, allowing use with other 
S3-compatible endpoints.")
-            .required(false)
-            .addValidator(StandardValidators.URL_VALIDATOR)
-            .build();
-
-    private volatile ClientType client;
-    private volatile Region region;
-
-    // If protocol is changed to be a property, ensure other uses are also 
changed
-    protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
-    protected static final String DEFAULT_USER_AGENT = "NiFi";
-
-    private static AllowableValue createAllowableValue(final Regions regions) {
-        return new AllowableValue(regions.getName(), regions.getName(), 
regions.getName());
-    }
-
-    private static AllowableValue[] getAvailableRegions() {
-        final List<AllowableValue> values = new ArrayList<>();
-        for (final Regions regions : Regions.values()) {
-            values.add(createAllowableValue(regions));
-        }
-
-        return (AllowableValue[]) values.toArray(new 
AllowableValue[values.size()]);
-    }
-
-    @Override
-    public Set<Relationship> getRelationships() {
-        return relationships;
-    }
-
-    @Override
-    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
-        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
-
-        final boolean accessKeySet = 
validationContext.getProperty(ACCESS_KEY).isSet();
-        final boolean secretKeySet = 
validationContext.getProperty(SECRET_KEY).isSet();
-        if ((accessKeySet && !secretKeySet) || (secretKeySet && 
!accessKeySet)) {
-            problems.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("If setting Secret Key or Access Key, must set 
both").build());
-        }
-
-        final boolean credentialsFileSet = 
validationContext.getProperty(CREDENTIALS_FILE).isSet();
-        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
-            problems.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("Cannot set both Credentials File and Secret 
Key/Access Key").build());
-        }
-
-        return problems;
-    }
-
-    protected ClientConfiguration createConfiguration(final ProcessContext 
context) {
-        final ClientConfiguration config = new ClientConfiguration();
-        config.setMaxConnections(context.getMaxConcurrentTasks());
-        config.setMaxErrorRetry(0);
-        config.setUserAgent(DEFAULT_USER_AGENT);
-        // If this is changed to be a property, ensure other uses are also 
changed
-        config.setProtocol(DEFAULT_PROTOCOL);
-        final int commsTimeout = 
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
-        config.setConnectionTimeout(commsTimeout);
-        config.setSocketTimeout(commsTimeout);
-
-        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-        if (sslContextService != null) {
-            final SSLContext sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
-            SdkTLSSocketFactory sdkTLSSocketFactory = new 
SdkTLSSocketFactory(sslContext, null);
-            
config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory);
-        }
-
-        return config;
-    }
-
-    @OnScheduled
-    public void onScheduled(final ProcessContext context) {
-        final ClientType awsClient = createClient(context, 
getCredentials(context), createConfiguration(context));
-        this.client = awsClient;
-
-        // if the processor supports REGION, get the configured region.
-        if (getSupportedPropertyDescriptors().contains(REGION)) {
-            final String region = context.getProperty(REGION).getValue();
-            if (region != null) {
-                this.region = Region.getRegion(Regions.fromName(region));
-                client.setRegion(this.region);
-            } else {
-                this.region = null;
-            }
-        }
-
-        // if the endpoint override has been configured, set the endpoint.
-        // (per Amazon docs this should only be configured at client creation)
-        final String urlstr = 
StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue());
-        if (!urlstr.isEmpty()) {
-            this.client.setEndpoint(urlstr);
-        }
-    }
-
-    protected abstract ClientType createClient(final ProcessContext context, 
final AWSCredentials credentials, final ClientConfiguration config);
-
-    protected ClientType getClient() {
-        return client;
-    }
-
-    protected Region getRegion() {
-        return region;
-    }
-
-    protected AWSCredentials getCredentials(final ProcessContext context) {
-        final String accessKey = context.getProperty(ACCESS_KEY).getValue();
-        final String secretKey = context.getProperty(SECRET_KEY).getValue();
-
-        final String credentialsFile = 
context.getProperty(CREDENTIALS_FILE).getValue();
-
-        if (credentialsFile != null) {
-            try {
-                return new PropertiesCredentials(new File(credentialsFile));
-            } catch (final IOException ioe) {
-                throw new ProcessException("Could not read Credentials File", 
ioe);
-            }
-        }
-
-        if (accessKey != null && secretKey != null) {
-            return new BasicAWSCredentials(accessKey, secretKey);
-        }
-
-        return new AnonymousAWSCredentials();
-    }
-
-    protected boolean isEmpty(final String value) {
-        return value == null || value.trim().equals("");
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.net.ssl.SSLContext;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.Protocol;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AnonymousAWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+
+public abstract class AbstractAWSProcessor<ClientType extends 
AmazonWebServiceClient> extends AbstractProcessor {
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
+            .description("FlowFiles are routed to success after being 
successfully copied to Amazon S3").build();
+    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
+            .description("FlowFiles are routed to failure if unable to be 
copied to Amazon S3").build();
+
+    public static final Set<Relationship> relationships = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    public static final PropertyDescriptor CREDENTIALS_FILE = new 
PropertyDescriptor.Builder()
+            .name("Credentials File")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor ACCESS_KEY = new 
PropertyDescriptor.Builder()
+            .name("Access Key")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+    public static final PropertyDescriptor SECRET_KEY = new 
PropertyDescriptor.Builder()
+            .name("Secret Key")
+            .expressionLanguageSupported(false)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(true)
+            .build();
+    public static final PropertyDescriptor REGION = new 
PropertyDescriptor.Builder()
+            .name("Region")
+            .required(true)
+            .allowableValues(getAvailableRegions())
+            
.defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue())
+            .build();
+
+    public static final PropertyDescriptor TIMEOUT = new 
PropertyDescriptor.Builder()
+            .name("Communications Timeout")
+            .required(true)
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .defaultValue("30 secs")
+            .build();
+
+    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
+            .name("SSL Context Service")
+            .description("Specifies an optional SSL Context Service that, if 
provided, will be used to create connections")
+            .required(false)
+            .identifiesControllerService(SSLContextService.class)
+            .build();
+
+    public static final PropertyDescriptor ENDPOINT_OVERRIDE = new 
PropertyDescriptor.Builder()
+            .name("Endpoint Override URL")
+            .description("Endpoint URL to use instead of the AWS default 
including scheme, host, port, and path. " +
+                    "The AWS libraries select an endpoint URL based on the AWS 
region, but this property overrides " +
+                    "the selected endpoint URL, allowing use with other 
S3-compatible endpoints.")
+            .required(false)
+            .addValidator(StandardValidators.URL_VALIDATOR)
+            .build();
+
+    private volatile ClientType client;
+    private volatile Region region;
+
+    // If protocol is changed to be a property, ensure other uses are also 
changed
+    protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
+    protected static final String DEFAULT_USER_AGENT = "NiFi";
+
+    private static AllowableValue createAllowableValue(final Regions regions) {
+        return new AllowableValue(regions.getName(), regions.getName(), 
regions.getName());
+    }
+
+    private static AllowableValue[] getAvailableRegions() {
+        final List<AllowableValue> values = new ArrayList<>();
+        for (final Regions regions : Regions.values()) {
+            values.add(createAllowableValue(regions));
+        }
+
+        return (AllowableValue[]) values.toArray(new 
AllowableValue[values.size()]);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+        final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(validationContext));
+
+        final boolean accessKeySet = 
validationContext.getProperty(ACCESS_KEY).isSet();
+        final boolean secretKeySet = 
validationContext.getProperty(SECRET_KEY).isSet();
+        if ((accessKeySet && !secretKeySet) || (secretKeySet && 
!accessKeySet)) {
+            problems.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("If setting Secret Key or Access Key, must set 
both").build());
+        }
+
+        final boolean credentialsFileSet = 
validationContext.getProperty(CREDENTIALS_FILE).isSet();
+        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
+            problems.add(new ValidationResult.Builder().input("Access 
Key").valid(false).explanation("Cannot set both Credentials File and Secret 
Key/Access Key").build());
+        }
+
+        return problems;
+    }
+
+    protected ClientConfiguration createConfiguration(final ProcessContext 
context) {
+        final ClientConfiguration config = new ClientConfiguration();
+        config.setMaxConnections(context.getMaxConcurrentTasks());
+        config.setMaxErrorRetry(0);
+        config.setUserAgent(DEFAULT_USER_AGENT);
+        // If this is changed to be a property, ensure other uses are also 
changed
+        config.setProtocol(DEFAULT_PROTOCOL);
+        final int commsTimeout = 
context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
+        config.setConnectionTimeout(commsTimeout);
+        config.setSocketTimeout(commsTimeout);
+
+        final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+        if (sslContextService != null) {
+            final SSLContext sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE);
+            SdkTLSSocketFactory sdkTLSSocketFactory = new 
SdkTLSSocketFactory(sslContext, null);
+            
config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory);
+        }
+
+        return config;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        final ClientType awsClient = createClient(context, 
getCredentials(context), createConfiguration(context));
+        this.client = awsClient;
+
+        // if the processor supports REGION, get the configured region.
+        if (getSupportedPropertyDescriptors().contains(REGION)) {
+            final String region = context.getProperty(REGION).getValue();
+            if (region != null) {
+                this.region = Region.getRegion(Regions.fromName(region));
+                client.setRegion(this.region);
+            } else {
+                this.region = null;
+            }
+        }
+
+        // if the endpoint override has been configured, set the endpoint.
+        // (per Amazon docs this should only be configured at client creation)
+        final String urlstr = 
StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue());
+        if (!urlstr.isEmpty()) {
+            this.client.setEndpoint(urlstr);
+        }
+    }
+
+    protected abstract ClientType createClient(final ProcessContext context, 
final AWSCredentials credentials, final ClientConfiguration config);
+
+    protected ClientType getClient() {
+        return client;
+    }
+
+    protected Region getRegion() {
+        return region;
+    }
+
+    protected AWSCredentials getCredentials(final ProcessContext context) {
+        final String accessKey = context.getProperty(ACCESS_KEY).getValue();
+        final String secretKey = context.getProperty(SECRET_KEY).getValue();
+
+        final String credentialsFile = 
context.getProperty(CREDENTIALS_FILE).getValue();
+
+        if (credentialsFile != null) {
+            try {
+                return new PropertiesCredentials(new File(credentialsFile));
+            } catch (final IOException ioe) {
+                throw new ProcessException("Could not read Credentials File", 
ioe);
+            }
+        }
+
+        if (accessKey != null && secretKey != null) {
+            return new BasicAWSCredentials(accessKey, secretKey);
+        }
+
+        return new AnonymousAWSCredentials();
+    }
+
+    protected boolean isEmpty(final String value) {
+        return value == null || value.trim().equals("");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index e01efcb..39ad667 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -1,192 +1,192 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.s3;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
-
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.regions.Region;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.S3ClientOptions;
-import com.amazonaws.services.s3.model.AccessControlList;
-import com.amazonaws.services.s3.model.CanonicalGrantee;
-import com.amazonaws.services.s3.model.EmailAddressGrantee;
-import com.amazonaws.services.s3.model.Grantee;
-import com.amazonaws.services.s3.model.Owner;
-import com.amazonaws.services.s3.model.Permission;
-
-public abstract class AbstractS3Processor extends 
AbstractAWSProcessor<AmazonS3Client> {
-
-    public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new 
PropertyDescriptor.Builder()
-            .name("FullControl User List")
-            .required(false)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Full Control for an object")
-            .defaultValue("${s3.permissions.full.users}")
-            .build();
-    public static final PropertyDescriptor READ_USER_LIST = new 
PropertyDescriptor.Builder()
-            .name("Read Permission User List")
-            .required(false)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Read Access for an object")
-            .defaultValue("${s3.permissions.read.users}")
-            .build();
-    public static final PropertyDescriptor WRITE_USER_LIST = new 
PropertyDescriptor.Builder()
-            .name("Write Permission User List")
-            .required(false)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Write Access for an object")
-            .defaultValue("${s3.permissions.write.users}")
-            .build();
-    public static final PropertyDescriptor READ_ACL_LIST = new 
PropertyDescriptor.Builder()
-            .name("Read ACL User List")
-            .required(false)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to read the Access Control 
List for an object")
-            .defaultValue("${s3.permissions.readacl.users}")
-            .build();
-    public static final PropertyDescriptor WRITE_ACL_LIST = new 
PropertyDescriptor.Builder()
-            .name("Write ACL User List")
-            .required(false)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to change the Access 
Control List for an object")
-            .defaultValue("${s3.permissions.writeacl.users}")
-            .build();
-    public static final PropertyDescriptor OWNER = new 
PropertyDescriptor.Builder()
-            .name("Owner")
-            .required(false)
-            .expressionLanguageSupported(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .description("The Amazon ID to use for the object's owner")
-            .defaultValue("${s3.owner}")
-            .build();
-    public static final PropertyDescriptor BUCKET = new 
PropertyDescriptor.Builder()
-            .name("Bucket")
-            .expressionLanguageSupported(true)
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .build();
-    public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
-            .name("Object Key")
-            .required(true)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .defaultValue("${filename}")
-            .build();
-
-    @Override
-    protected AmazonS3Client createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
-        final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
-
-        // if ENDPOINT_OVERRIDE is set, use PathStyleAccess
-        
if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()).isEmpty()
 == false){
-            final S3ClientOptions s3Options = new S3ClientOptions();
-            s3Options.setPathStyleAccess(true);
-            s3.setS3ClientOptions(s3Options);
-        }
-
-        return s3;
-    }
-
-    protected Grantee createGrantee(final String value) {
-        if (isEmpty(value)) {
-            return null;
-        }
-
-        if (value.contains("@")) {
-            return new EmailAddressGrantee(value);
-        } else {
-            return new CanonicalGrantee(value);
-        }
-    }
-
-    protected final List<Grantee> createGrantees(final String value) {
-        if (isEmpty(value)) {
-            return Collections.emptyList();
-        }
-
-        final List<Grantee> grantees = new ArrayList<>();
-        final String[] vals = value.split(",");
-        for (final String val : vals) {
-            final String identifier = val.trim();
-            final Grantee grantee = createGrantee(identifier);
-            if (grantee != null) {
-                grantees.add(grantee);
-            }
-        }
-        return grantees;
-    }
-
-    protected String getUrlForObject(final String bucket, final String key) {
-        Region region = getRegion();
-
-        if (region == null) {
-            return  DEFAULT_PROTOCOL.toString() + "://s3.amazonaws.com/" + 
bucket + "/" + key;
-        } else {
-            final String endpoint = region.getServiceEndpoint("s3");
-            return DEFAULT_PROTOCOL.toString() + "://" + endpoint + "/" + 
bucket + "/" + key;
-        }
-    }
-
-    protected final AccessControlList createACL(final ProcessContext context, 
final FlowFile flowFile) {
-        final AccessControlList acl = new AccessControlList();
-
-        final String ownerId = 
context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
-        if (!isEmpty(ownerId)) {
-            final Owner owner = new Owner();
-            owner.setId(ownerId);
-            acl.setOwner(owner);
-        }
-
-        for (final Grantee grantee : 
createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
-            acl.grantPermission(grantee, Permission.FullControl);
-        }
-
-        for (final Grantee grantee : 
createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
-            acl.grantPermission(grantee, Permission.Read);
-        }
-
-        for (final Grantee grantee : 
createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
-            acl.grantPermission(grantee, Permission.Write);
-        }
-
-        for (final Grantee grantee : 
createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
-            acl.grantPermission(grantee, Permission.ReadAcp);
-        }
-
-        for (final Grantee grantee : 
createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
-            acl.grantPermission(grantee, Permission.WriteAcp);
-        }
-
-        return acl;
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.CanonicalGrantee;
+import com.amazonaws.services.s3.model.EmailAddressGrantee;
+import com.amazonaws.services.s3.model.Grantee;
+import com.amazonaws.services.s3.model.Owner;
+import com.amazonaws.services.s3.model.Permission;
+
+public abstract class AbstractS3Processor extends 
AbstractAWSProcessor<AmazonS3Client> {
+
+    public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new 
PropertyDescriptor.Builder()
+            .name("FullControl User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Full Control for an object")
+            .defaultValue("${s3.permissions.full.users}")
+            .build();
+    public static final PropertyDescriptor READ_USER_LIST = new 
PropertyDescriptor.Builder()
+            .name("Read Permission User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Read Access for an object")
+            .defaultValue("${s3.permissions.read.users}")
+            .build();
+    public static final PropertyDescriptor WRITE_USER_LIST = new 
PropertyDescriptor.Builder()
+            .name("Write Permission User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have Write Access for an object")
+            .defaultValue("${s3.permissions.write.users}")
+            .build();
+    public static final PropertyDescriptor READ_ACL_LIST = new 
PropertyDescriptor.Builder()
+            .name("Read ACL User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to read the Access Control 
List for an object")
+            .defaultValue("${s3.permissions.readacl.users}")
+            .build();
+    public static final PropertyDescriptor WRITE_ACL_LIST = new 
PropertyDescriptor.Builder()
+            .name("Write ACL User List")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("A comma-separated list of Amazon User ID's or E-mail 
addresses that specifies who should have permissions to change the Access 
Control List for an object")
+            .defaultValue("${s3.permissions.writeacl.users}")
+            .build();
+    public static final PropertyDescriptor OWNER = new 
PropertyDescriptor.Builder()
+            .name("Owner")
+            .required(false)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .description("The Amazon ID to use for the object's owner")
+            .defaultValue("${s3.owner}")
+            .build();
+    public static final PropertyDescriptor BUCKET = new 
PropertyDescriptor.Builder()
+            .name("Bucket")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor KEY = new 
PropertyDescriptor.Builder()
+            .name("Object Key")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .defaultValue("${filename}")
+            .build();
+
+    @Override
+    protected AmazonS3Client createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
+        final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
+
+        // if ENDPOINT_OVERRIDE is set, use PathStyleAccess
+        
if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()).isEmpty()
 == false){
+            final S3ClientOptions s3Options = new S3ClientOptions();
+            s3Options.setPathStyleAccess(true);
+            s3.setS3ClientOptions(s3Options);
+        }
+
+        return s3;
+    }
+
+    protected Grantee createGrantee(final String value) {
+        if (isEmpty(value)) {
+            return null;
+        }
+
+        if (value.contains("@")) {
+            return new EmailAddressGrantee(value);
+        } else {
+            return new CanonicalGrantee(value);
+        }
+    }
+
+    protected final List<Grantee> createGrantees(final String value) {
+        if (isEmpty(value)) {
+            return Collections.emptyList();
+        }
+
+        final List<Grantee> grantees = new ArrayList<>();
+        final String[] vals = value.split(",");
+        for (final String val : vals) {
+            final String identifier = val.trim();
+            final Grantee grantee = createGrantee(identifier);
+            if (grantee != null) {
+                grantees.add(grantee);
+            }
+        }
+        return grantees;
+    }
+
+    protected String getUrlForObject(final String bucket, final String key) {
+        Region region = getRegion();
+
+        if (region == null) {
+            return  DEFAULT_PROTOCOL.toString() + "://s3.amazonaws.com/" + 
bucket + "/" + key;
+        } else {
+            final String endpoint = region.getServiceEndpoint("s3");
+            return DEFAULT_PROTOCOL.toString() + "://" + endpoint + "/" + 
bucket + "/" + key;
+        }
+    }
+
+    protected final AccessControlList createACL(final ProcessContext context, 
final FlowFile flowFile) {
+        final AccessControlList acl = new AccessControlList();
+
+        final String ownerId = 
context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
+        if (!isEmpty(ownerId)) {
+            final Owner owner = new Owner();
+            owner.setId(ownerId);
+            acl.setOwner(owner);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.FullControl);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.Read);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.Write);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.ReadAcp);
+        }
+
+        for (final Grantee grantee : 
createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue()))
 {
+            acl.grantPermission(grantee, Permission.WriteAcp);
+        }
+
+        return acl;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 9dc3595..ec6eea7 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -1,104 +1,104 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.s3;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.DeleteObjectRequest;
-import com.amazonaws.services.s3.model.DeleteVersionRequest;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.StandardValidators;
-
-
-@SupportsBatching
-@SeeAlso({PutS3Object.class, FetchS3Object.class})
-@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
-        "If attempting to delete a file that does not exist, FlowFile is 
routed to success.")
-public class DeleteS3Object extends AbstractS3Processor {
-
-    public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
-            .name("Version")
-            .description("The Version of the Object to delete")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .required(false)
-            .build();
-
-    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID,
-                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, 
READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
-                    SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final long startNanos = System.nanoTime();
-
-        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
-        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
-
-        final AmazonS3 s3 = getClient();
-
-        // Deletes a key on Amazon S3
-        try {
-            if (versionId == null) {
-                final DeleteObjectRequest r = new DeleteObjectRequest(bucket, 
key);
-                // This call returns success if object doesn't exist
-                s3.deleteObject(r);
-            } else {
-                final DeleteVersionRequest r = new 
DeleteVersionRequest(bucket, key, versionId);
-                s3.deleteVersion(r);
-            }
-        } catch (final AmazonServiceException ase) {
-            getLogger().error("Failed to delete S3 Object for {}; routing to 
failure", new Object[]{flowFile, ase});
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
-
-        session.transfer(flowFile, REL_SUCCESS);
-        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully delete S3 Object for {} in {} millis; 
routing to success", new Object[]{flowFile, transferMillis});
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.DeleteObjectRequest;
+import com.amazonaws.services.s3.model.DeleteVersionRequest;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+
+@SupportsBatching
+@SeeAlso({PutS3Object.class, FetchS3Object.class})
+@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " +
+        "If attempting to delete a file that does not exist, FlowFile is 
routed to success.")
+public class DeleteS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
+            .name("Version")
+            .description("The Version of the Object to delete")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID,
+                    FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, 
READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
+                    SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 s3 = getClient();
+
+        // Deletes a key on Amazon S3
+        try {
+            if (versionId == null) {
+                final DeleteObjectRequest r = new DeleteObjectRequest(bucket, 
key);
+                // This call returns success if object doesn't exist
+                s3.deleteObject(r);
+            } else {
+                final DeleteVersionRequest r = new 
DeleteVersionRequest(bucket, key, versionId);
+                s3.deleteVersion(r);
+            }
+        } catch (final AmazonServiceException ase) {
+            getLogger().error("Failed to delete S3 Object for {}; routing to 
failure", new Object[]{flowFile, ase});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully delete S3 Object for {} in {} millis; 
routing to success", new Object[]{flowFile, transferMillis});
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 9eaf019..ae0af56 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -1,159 +1,159 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.aws.s3;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.StandardValidators;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3Object;
-
-@SupportsBatching
-@SeeAlso({PutS3Object.class, DeleteS3Object.class})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
-@CapabilityDescription("Retrieves the contents of an S3 Object and writes it 
to the content of a FlowFile")
-@WritesAttributes({
-    @WritesAttribute(attribute = "s3.bucket", description = "The name of the 
S3 bucket"),
-    @WritesAttribute(attribute = "path", description = "The path of the file"),
-    @WritesAttribute(attribute = "absolute.path", description = "The path of 
the file"),
-    @WritesAttribute(attribute = "filename", description = "The name of the 
file"),
-    @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of 
the file"),
-    @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
-    @WritesAttribute(attribute = "mime.type", description = "If S3 provides 
the content type/MIME type, this attribute will hold that file"),
-    @WritesAttribute(attribute = "s3.etag", description = "The ETag that can 
be used to see if the file has changed"),
-    @WritesAttribute(attribute = "s3.expirationTime", description = "If the 
file has an expiration date, this attribute will be set, containing the 
milliseconds since epoch in UTC time"),
-    @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The 
ID of the rule that dictates this object's expiration time"),
-    @WritesAttribute(attribute = "s3.version", description = "The version of 
the S3 object"),})
-public class FetchS3Object extends AbstractS3Processor {
-
-    public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
-            .name("Version")
-            .description("The Version of the Object to download")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(true)
-            .required(false)
-            .build();
-
-    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
-
-    @Override
-    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return properties;
-    }
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
-        final long startNanos = System.nanoTime();
-        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
-        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
-
-        final AmazonS3 client = getClient();
-        final GetObjectRequest request;
-        if (versionId == null) {
-            request = new GetObjectRequest(bucket, key);
-        } else {
-            request = new GetObjectRequest(bucket, key, versionId);
-        }
-
-        final Map<String, String> attributes = new HashMap<>();
-        try (final S3Object s3Object = client.getObject(request)) {
-            flowFile = session.importFrom(s3Object.getObjectContent(), 
flowFile);
-            attributes.put("s3.bucket", s3Object.getBucketName());
-
-            final ObjectMetadata metadata = s3Object.getObjectMetadata();
-            if (metadata.getContentDisposition() != null) {
-                final String fullyQualified = metadata.getContentDisposition();
-                final int lastSlash = fullyQualified.lastIndexOf("/");
-                if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) 
{
-                    attributes.put(CoreAttributes.PATH.key(), 
fullyQualified.substring(0, lastSlash));
-                    attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), 
fullyQualified);
-                    attributes.put(CoreAttributes.FILENAME.key(), 
fullyQualified.substring(lastSlash + 1));
-                } else {
-                    attributes.put(CoreAttributes.FILENAME.key(), 
metadata.getContentDisposition());
-                }
-            }
-            if (metadata.getContentMD5() != null) {
-                attributes.put("hash.value", metadata.getContentMD5());
-                attributes.put("hash.algorithm", "MD5");
-            }
-            if (metadata.getContentType() != null) {
-                attributes.put(CoreAttributes.MIME_TYPE.key(), 
metadata.getContentType());
-            }
-            if (metadata.getETag() != null) {
-                attributes.put("s3.etag", metadata.getETag());
-            }
-            if (metadata.getExpirationTime() != null) {
-                attributes.put("s3.expirationTime", 
String.valueOf(metadata.getExpirationTime().getTime()));
-            }
-            if (metadata.getExpirationTimeRuleId() != null) {
-                attributes.put("s3.expirationTimeRuleId", 
metadata.getExpirationTimeRuleId());
-            }
-            if (metadata.getUserMetadata() != null) {
-                attributes.putAll(metadata.getUserMetadata());
-            }
-            if (metadata.getVersionId() != null) {
-                attributes.put("s3.version", metadata.getVersionId());
-            }
-        } catch (final IOException | AmazonClientException ioe) {
-            getLogger().error("Failed to retrieve S3 Object for {}; routing to 
failure", new Object[]{flowFile, ioe});
-            flowFile = session.penalize(flowFile);
-            session.transfer(flowFile, REL_FAILURE);
-            return;
-        }
-
-        if (!attributes.isEmpty()) {
-            flowFile = session.putAllAttributes(flowFile, attributes);
-        }
-
-        session.transfer(flowFile, REL_SUCCESS);
-        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully retrieved S3 Object for {} in {} 
millis; routing to success", new Object[]{flowFile, transferMillis});
-        session.getProvenanceReporter().fetch(flowFile, "http://"; + bucket + 
".amazonaws.com/" + key, transferMillis);
-    }
-
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.s3;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+
+@SupportsBatching
+@SeeAlso({PutS3Object.class, DeleteS3Object.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"})
+@CapabilityDescription("Retrieves the contents of an S3 Object and writes it 
to the content of a FlowFile")
+@WritesAttributes({
+    @WritesAttribute(attribute = "s3.bucket", description = "The name of the 
S3 bucket"),
+    @WritesAttribute(attribute = "path", description = "The path of the file"),
+    @WritesAttribute(attribute = "absolute.path", description = "The path of 
the file"),
+    @WritesAttribute(attribute = "filename", description = "The name of the 
file"),
+    @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of 
the file"),
+    @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
+    @WritesAttribute(attribute = "mime.type", description = "If S3 provides 
the content type/MIME type, this attribute will hold that file"),
+    @WritesAttribute(attribute = "s3.etag", description = "The ETag that can 
be used to see if the file has changed"),
+    @WritesAttribute(attribute = "s3.expirationTime", description = "If the 
file has an expiration date, this attribute will be set, containing the 
milliseconds since epoch in UTC time"),
+    @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The 
ID of the rule that dictates this object's expiration time"),
+    @WritesAttribute(attribute = "s3.version", description = "The version of 
the S3 object"),})
+public class FetchS3Object extends AbstractS3Processor {
+
+    public static final PropertyDescriptor VERSION_ID = new 
PropertyDescriptor.Builder()
+            .name("Version")
+            .description("The Version of the Object to download")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
+    public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
+            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 client = getClient();
+        final GetObjectRequest request;
+        if (versionId == null) {
+            request = new GetObjectRequest(bucket, key);
+        } else {
+            request = new GetObjectRequest(bucket, key, versionId);
+        }
+
+        final Map<String, String> attributes = new HashMap<>();
+        try (final S3Object s3Object = client.getObject(request)) {
+            flowFile = session.importFrom(s3Object.getObjectContent(), 
flowFile);
+            attributes.put("s3.bucket", s3Object.getBucketName());
+
+            final ObjectMetadata metadata = s3Object.getObjectMetadata();
+            if (metadata.getContentDisposition() != null) {
+                final String fullyQualified = metadata.getContentDisposition();
+                final int lastSlash = fullyQualified.lastIndexOf("/");
+                if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) 
{
+                    attributes.put(CoreAttributes.PATH.key(), 
fullyQualified.substring(0, lastSlash));
+                    attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), 
fullyQualified);
+                    attributes.put(CoreAttributes.FILENAME.key(), 
fullyQualified.substring(lastSlash + 1));
+                } else {
+                    attributes.put(CoreAttributes.FILENAME.key(), 
metadata.getContentDisposition());
+                }
+            }
+            if (metadata.getContentMD5() != null) {
+                attributes.put("hash.value", metadata.getContentMD5());
+                attributes.put("hash.algorithm", "MD5");
+            }
+            if (metadata.getContentType() != null) {
+                attributes.put(CoreAttributes.MIME_TYPE.key(), 
metadata.getContentType());
+            }
+            if (metadata.getETag() != null) {
+                attributes.put("s3.etag", metadata.getETag());
+            }
+            if (metadata.getExpirationTime() != null) {
+                attributes.put("s3.expirationTime", 
String.valueOf(metadata.getExpirationTime().getTime()));
+            }
+            if (metadata.getExpirationTimeRuleId() != null) {
+                attributes.put("s3.expirationTimeRuleId", 
metadata.getExpirationTimeRuleId());
+            }
+            if (metadata.getUserMetadata() != null) {
+                attributes.putAll(metadata.getUserMetadata());
+            }
+            if (metadata.getVersionId() != null) {
+                attributes.put("s3.version", metadata.getVersionId());
+            }
+        } catch (final IOException | AmazonClientException ioe) {
+            getLogger().error("Failed to retrieve S3 Object for {}; routing to 
failure", new Object[]{flowFile, ioe});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        if (!attributes.isEmpty()) {
+            flowFile = session.putAllAttributes(flowFile, attributes);
+        }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+        getLogger().info("Successfully retrieved S3 Object for {} in {} 
millis; routing to success", new Object[]{flowFile, transferMillis});
+        session.getProvenanceReporter().fetch(flowFile, "http://"; + bucket + 
".amazonaws.com/" + key, transferMillis);
+    }
+
+}

Reply via email to