http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java index f8e74bc..fbdea94 100644 --- a/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java +++ b/nifi-mock/src/test/java/org/apache/nifi/util/TestMockProcessContext.java @@ -1,126 +1,126 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.util; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.junit.Test; - -public class TestMockProcessContext { - - @Test - public void testRemoveProperty() { - final DummyProcessor proc = new DummyProcessor(); - final MockProcessContext context = new MockProcessContext(proc); - context.setProperty(DummyProcessor.REQUIRED_PROP, "req-value"); - context.setProperty(DummyProcessor.OPTIONAL_PROP, "opt-value"); - context.setProperty(DummyProcessor.DEFAULTED_PROP, "custom-value"); - - assertEquals(1, proc.getUpdateCount(DummyProcessor.REQUIRED_PROP)); - assertEquals(1, proc.getUpdateCount(DummyProcessor.OPTIONAL_PROP)); - assertEquals(1, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP)); - - assertTrue(context.removeProperty(DummyProcessor.OPTIONAL_PROP)); - assertNull(context.getProperty(DummyProcessor.OPTIONAL_PROP).getValue()); - assertFalse(context.removeProperty(DummyProcessor.OPTIONAL_PROP)); - assertEquals(2, proc.getUpdateCount(DummyProcessor.OPTIONAL_PROP)); - - assertTrue(context.removeProperty(DummyProcessor.REQUIRED_PROP)); - assertNull(context.getProperty(DummyProcessor.REQUIRED_PROP).getValue()); - assertFalse(context.removeProperty(DummyProcessor.REQUIRED_PROP)); - assertEquals(2, proc.getUpdateCount(DummyProcessor.REQUIRED_PROP)); - - assertTrue(context.removeProperty(DummyProcessor.DEFAULTED_PROP)); - assertEquals("default-value", context.getProperty(DummyProcessor.DEFAULTED_PROP).getValue()); - assertFalse(context.removeProperty(DummyProcessor.DEFAULTED_PROP)); - assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP)); - - // Since value is already the default, this shouldn't trigger onPropertyModified to be called. - context.setProperty(DummyProcessor.DEFAULTED_PROP, DummyProcessor.DEFAULTED_PROP.getDefaultValue()); - assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP)); - - assertEquals("default-value", context.getProperty(DummyProcessor.DEFAULTED_PROP).getValue()); - assertTrue(context.removeProperty(DummyProcessor.DEFAULTED_PROP)); - - // since we are calling remove on a property that has a default value, this shouldn't - // trigger the onPropertyModified method to be called. - assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP)); - } - - private static class DummyProcessor extends AbstractProcessor { - static final PropertyDescriptor REQUIRED_PROP = new PropertyDescriptor.Builder() - .name("required") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - static final PropertyDescriptor DEFAULTED_PROP = new PropertyDescriptor.Builder() - .name("defaulted") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("default-value") - .build(); - static final PropertyDescriptor OPTIONAL_PROP = new PropertyDescriptor.Builder() - .name("optional") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - private final Map<PropertyDescriptor, Integer> propertyModifiedCount = new HashMap<>(); - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(REQUIRED_PROP); - properties.add(DEFAULTED_PROP); - properties.add(OPTIONAL_PROP); - return properties; - } - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - Integer updateCount = propertyModifiedCount.get(descriptor); - if (updateCount == null) { - updateCount = 0; - } - - propertyModifiedCount.put(descriptor, updateCount + 1); - } - - public int getUpdateCount(final PropertyDescriptor descriptor) { - Integer updateCount = propertyModifiedCount.get(descriptor); - return (updateCount == null) ? 0 : updateCount; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.junit.Test; + +public class TestMockProcessContext { + + @Test + public void testRemoveProperty() { + final DummyProcessor proc = new DummyProcessor(); + final MockProcessContext context = new MockProcessContext(proc); + context.setProperty(DummyProcessor.REQUIRED_PROP, "req-value"); + context.setProperty(DummyProcessor.OPTIONAL_PROP, "opt-value"); + context.setProperty(DummyProcessor.DEFAULTED_PROP, "custom-value"); + + assertEquals(1, proc.getUpdateCount(DummyProcessor.REQUIRED_PROP)); + assertEquals(1, proc.getUpdateCount(DummyProcessor.OPTIONAL_PROP)); + assertEquals(1, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP)); + + assertTrue(context.removeProperty(DummyProcessor.OPTIONAL_PROP)); + assertNull(context.getProperty(DummyProcessor.OPTIONAL_PROP).getValue()); + assertFalse(context.removeProperty(DummyProcessor.OPTIONAL_PROP)); + assertEquals(2, proc.getUpdateCount(DummyProcessor.OPTIONAL_PROP)); + + assertTrue(context.removeProperty(DummyProcessor.REQUIRED_PROP)); + assertNull(context.getProperty(DummyProcessor.REQUIRED_PROP).getValue()); + assertFalse(context.removeProperty(DummyProcessor.REQUIRED_PROP)); + assertEquals(2, proc.getUpdateCount(DummyProcessor.REQUIRED_PROP)); + + assertTrue(context.removeProperty(DummyProcessor.DEFAULTED_PROP)); + assertEquals("default-value", context.getProperty(DummyProcessor.DEFAULTED_PROP).getValue()); + assertFalse(context.removeProperty(DummyProcessor.DEFAULTED_PROP)); + assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP)); + + // Since value is already the default, this shouldn't trigger onPropertyModified to be called. + context.setProperty(DummyProcessor.DEFAULTED_PROP, DummyProcessor.DEFAULTED_PROP.getDefaultValue()); + assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP)); + + assertEquals("default-value", context.getProperty(DummyProcessor.DEFAULTED_PROP).getValue()); + assertTrue(context.removeProperty(DummyProcessor.DEFAULTED_PROP)); + + // since we are calling remove on a property that has a default value, this shouldn't + // trigger the onPropertyModified method to be called. + assertEquals(2, proc.getUpdateCount(DummyProcessor.DEFAULTED_PROP)); + } + + private static class DummyProcessor extends AbstractProcessor { + static final PropertyDescriptor REQUIRED_PROP = new PropertyDescriptor.Builder() + .name("required") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + static final PropertyDescriptor DEFAULTED_PROP = new PropertyDescriptor.Builder() + .name("defaulted") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("default-value") + .build(); + static final PropertyDescriptor OPTIONAL_PROP = new PropertyDescriptor.Builder() + .name("optional") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private final Map<PropertyDescriptor, Integer> propertyModifiedCount = new HashMap<>(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(REQUIRED_PROP); + properties.add(DEFAULTED_PROP); + properties.add(OPTIONAL_PROP); + return properties; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + Integer updateCount = propertyModifiedCount.get(descriptor); + if (updateCount == null) { + updateCount = 0; + } + + propertyModifiedCount.put(descriptor, updateCount + 1); + } + + public int getUpdateCount(final PropertyDescriptor descriptor) { + Integer updateCount = propertyModifiedCount.get(descriptor); + return (updateCount == null) ? 0 : updateCount; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index 8c1919e..8d4ef21 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -1,239 +1,239 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLContext; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.lifecycle.OnScheduled; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.processor.AbstractProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.ssl.SSLContextService; - -import com.amazonaws.AmazonWebServiceClient; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.Protocol; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AnonymousAWSCredentials; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.PropertiesCredentials; -import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory; -import com.amazonaws.regions.Region; -import com.amazonaws.regions.Regions; - -public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor { - - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") - .description("FlowFiles are routed to success after being successfully copied to Amazon S3").build(); - public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") - .description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build(); - - public static final Set<Relationship> relationships = Collections.unmodifiableSet( - new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); - - public static final PropertyDescriptor CREDENTIALS_FILE = new PropertyDescriptor.Builder() - .name("Credentials File") - .expressionLanguageSupported(false) - .required(false) - .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) - .build(); - public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder() - .name("Access Key") - .expressionLanguageSupported(false) - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder() - .name("Secret Key") - .expressionLanguageSupported(false) - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .sensitive(true) - .build(); - public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() - .name("Region") - .required(true) - .allowableValues(getAvailableRegions()) - .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue()) - .build(); - - public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() - .name("Communications Timeout") - .required(true) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .defaultValue("30 secs") - .build(); - - public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); - - public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder() - .name("Endpoint Override URL") - .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " + - "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " + - "the selected endpoint URL, allowing use with other S3-compatible endpoints.") - .required(false) - .addValidator(StandardValidators.URL_VALIDATOR) - .build(); - - private volatile ClientType client; - private volatile Region region; - - // If protocol is changed to be a property, ensure other uses are also changed - protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS; - protected static final String DEFAULT_USER_AGENT = "NiFi"; - - private static AllowableValue createAllowableValue(final Regions regions) { - return new AllowableValue(regions.getName(), regions.getName(), regions.getName()); - } - - private static AllowableValue[] getAvailableRegions() { - final List<AllowableValue> values = new ArrayList<>(); - for (final Regions regions : Regions.values()) { - values.add(createAllowableValue(regions)); - } - - return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]); - } - - @Override - public Set<Relationship> getRelationships() { - return relationships; - } - - @Override - protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { - final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext)); - - final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet(); - final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet(); - if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) { - problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build()); - } - - final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet(); - if ((secretKeySet || accessKeySet) && credentialsFileSet) { - problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build()); - } - - return problems; - } - - protected ClientConfiguration createConfiguration(final ProcessContext context) { - final ClientConfiguration config = new ClientConfiguration(); - config.setMaxConnections(context.getMaxConcurrentTasks()); - config.setMaxErrorRetry(0); - config.setUserAgent(DEFAULT_USER_AGENT); - // If this is changed to be a property, ensure other uses are also changed - config.setProtocol(DEFAULT_PROTOCOL); - final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); - config.setConnectionTimeout(commsTimeout); - config.setSocketTimeout(commsTimeout); - - final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - if (sslContextService != null) { - final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE); - SdkTLSSocketFactory sdkTLSSocketFactory = new SdkTLSSocketFactory(sslContext, null); - config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory); - } - - return config; - } - - @OnScheduled - public void onScheduled(final ProcessContext context) { - final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context)); - this.client = awsClient; - - // if the processor supports REGION, get the configured region. - if (getSupportedPropertyDescriptors().contains(REGION)) { - final String region = context.getProperty(REGION).getValue(); - if (region != null) { - this.region = Region.getRegion(Regions.fromName(region)); - client.setRegion(this.region); - } else { - this.region = null; - } - } - - // if the endpoint override has been configured, set the endpoint. - // (per Amazon docs this should only be configured at client creation) - final String urlstr = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()); - if (!urlstr.isEmpty()) { - this.client.setEndpoint(urlstr); - } - } - - protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config); - - protected ClientType getClient() { - return client; - } - - protected Region getRegion() { - return region; - } - - protected AWSCredentials getCredentials(final ProcessContext context) { - final String accessKey = context.getProperty(ACCESS_KEY).getValue(); - final String secretKey = context.getProperty(SECRET_KEY).getValue(); - - final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue(); - - if (credentialsFile != null) { - try { - return new PropertiesCredentials(new File(credentialsFile)); - } catch (final IOException ioe) { - throw new ProcessException("Could not read Credentials File", ioe); - } - } - - if (accessKey != null && secretKey != null) { - return new BasicAWSCredentials(accessKey, secretKey); - } - - return new AnonymousAWSCredentials(); - } - - protected boolean isEmpty(final String value) { - return value == null || value.trim().equals(""); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.Protocol; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AnonymousAWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.PropertiesCredentials; +import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; + +public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("FlowFiles are routed to success after being successfully copied to Amazon S3").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("FlowFiles are routed to failure if unable to be copied to Amazon S3").build(); + + public static final Set<Relationship> relationships = Collections.unmodifiableSet( + new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + + public static final PropertyDescriptor CREDENTIALS_FILE = new PropertyDescriptor.Builder() + .name("Credentials File") + .expressionLanguageSupported(false) + .required(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .build(); + public static final PropertyDescriptor ACCESS_KEY = new PropertyDescriptor.Builder() + .name("Access Key") + .expressionLanguageSupported(false) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor SECRET_KEY = new PropertyDescriptor.Builder() + .name("Secret Key") + .expressionLanguageSupported(false) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(true) + .build(); + public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder() + .name("Region") + .required(true) + .allowableValues(getAvailableRegions()) + .defaultValue(createAllowableValue(Regions.DEFAULT_REGION).getValue()) + .build(); + + public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() + .name("Communications Timeout") + .required(true) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("30 secs") + .build(); + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("Specifies an optional SSL Context Service that, if provided, will be used to create connections") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder() + .name("Endpoint Override URL") + .description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " + + "The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " + + "the selected endpoint URL, allowing use with other S3-compatible endpoints.") + .required(false) + .addValidator(StandardValidators.URL_VALIDATOR) + .build(); + + private volatile ClientType client; + private volatile Region region; + + // If protocol is changed to be a property, ensure other uses are also changed + protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS; + protected static final String DEFAULT_USER_AGENT = "NiFi"; + + private static AllowableValue createAllowableValue(final Regions regions) { + return new AllowableValue(regions.getName(), regions.getName(), regions.getName()); + } + + private static AllowableValue[] getAvailableRegions() { + final List<AllowableValue> values = new ArrayList<>(); + for (final Regions regions : Regions.values()) { + values.add(createAllowableValue(regions)); + } + + return (AllowableValue[]) values.toArray(new AllowableValue[values.size()]); + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext)); + + final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet(); + final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet(); + if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) { + problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build()); + } + + final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet(); + if ((secretKeySet || accessKeySet) && credentialsFileSet) { + problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build()); + } + + return problems; + } + + protected ClientConfiguration createConfiguration(final ProcessContext context) { + final ClientConfiguration config = new ClientConfiguration(); + config.setMaxConnections(context.getMaxConcurrentTasks()); + config.setMaxErrorRetry(0); + config.setUserAgent(DEFAULT_USER_AGENT); + // If this is changed to be a property, ensure other uses are also changed + config.setProtocol(DEFAULT_PROTOCOL); + final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue(); + config.setConnectionTimeout(commsTimeout); + config.setSocketTimeout(commsTimeout); + + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null) { + final SSLContext sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.NONE); + SdkTLSSocketFactory sdkTLSSocketFactory = new SdkTLSSocketFactory(sslContext, null); + config.getApacheHttpClientConfig().setSslSocketFactory(sdkTLSSocketFactory); + } + + return config; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context)); + this.client = awsClient; + + // if the processor supports REGION, get the configured region. + if (getSupportedPropertyDescriptors().contains(REGION)) { + final String region = context.getProperty(REGION).getValue(); + if (region != null) { + this.region = Region.getRegion(Regions.fromName(region)); + client.setRegion(this.region); + } else { + this.region = null; + } + } + + // if the endpoint override has been configured, set the endpoint. + // (per Amazon docs this should only be configured at client creation) + final String urlstr = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()); + if (!urlstr.isEmpty()) { + this.client.setEndpoint(urlstr); + } + } + + protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config); + + protected ClientType getClient() { + return client; + } + + protected Region getRegion() { + return region; + } + + protected AWSCredentials getCredentials(final ProcessContext context) { + final String accessKey = context.getProperty(ACCESS_KEY).getValue(); + final String secretKey = context.getProperty(SECRET_KEY).getValue(); + + final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue(); + + if (credentialsFile != null) { + try { + return new PropertiesCredentials(new File(credentialsFile)); + } catch (final IOException ioe) { + throw new ProcessException("Could not read Credentials File", ioe); + } + } + + if (accessKey != null && secretKey != null) { + return new BasicAWSCredentials(accessKey, secretKey); + } + + return new AnonymousAWSCredentials(); + } + + protected boolean isEmpty(final String value) { + return value == null || value.trim().equals(""); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java index e01efcb..39ad667 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -1,192 +1,192 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.s3; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.regions.Region; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.S3ClientOptions; -import com.amazonaws.services.s3.model.AccessControlList; -import com.amazonaws.services.s3.model.CanonicalGrantee; -import com.amazonaws.services.s3.model.EmailAddressGrantee; -import com.amazonaws.services.s3.model.Grantee; -import com.amazonaws.services.s3.model.Owner; -import com.amazonaws.services.s3.model.Permission; - -public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3Client> { - - public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder() - .name("FullControl User List") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Full Control for an object") - .defaultValue("${s3.permissions.full.users}") - .build(); - public static final PropertyDescriptor READ_USER_LIST = new PropertyDescriptor.Builder() - .name("Read Permission User List") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Read Access for an object") - .defaultValue("${s3.permissions.read.users}") - .build(); - public static final PropertyDescriptor WRITE_USER_LIST = new PropertyDescriptor.Builder() - .name("Write Permission User List") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Write Access for an object") - .defaultValue("${s3.permissions.write.users}") - .build(); - public static final PropertyDescriptor READ_ACL_LIST = new PropertyDescriptor.Builder() - .name("Read ACL User List") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to read the Access Control List for an object") - .defaultValue("${s3.permissions.readacl.users}") - .build(); - public static final PropertyDescriptor WRITE_ACL_LIST = new PropertyDescriptor.Builder() - .name("Write ACL User List") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object") - .defaultValue("${s3.permissions.writeacl.users}") - .build(); - public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder() - .name("Owner") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .description("The Amazon ID to use for the object's owner") - .defaultValue("${s3.owner}") - .build(); - public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder() - .name("Bucket") - .expressionLanguageSupported(true) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() - .name("Object Key") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .defaultValue("${filename}") - .build(); - - @Override - protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { - final AmazonS3Client s3 = new AmazonS3Client(credentials, config); - - // if ENDPOINT_OVERRIDE is set, use PathStyleAccess - if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()).isEmpty() == false){ - final S3ClientOptions s3Options = new S3ClientOptions(); - s3Options.setPathStyleAccess(true); - s3.setS3ClientOptions(s3Options); - } - - return s3; - } - - protected Grantee createGrantee(final String value) { - if (isEmpty(value)) { - return null; - } - - if (value.contains("@")) { - return new EmailAddressGrantee(value); - } else { - return new CanonicalGrantee(value); - } - } - - protected final List<Grantee> createGrantees(final String value) { - if (isEmpty(value)) { - return Collections.emptyList(); - } - - final List<Grantee> grantees = new ArrayList<>(); - final String[] vals = value.split(","); - for (final String val : vals) { - final String identifier = val.trim(); - final Grantee grantee = createGrantee(identifier); - if (grantee != null) { - grantees.add(grantee); - } - } - return grantees; - } - - protected String getUrlForObject(final String bucket, final String key) { - Region region = getRegion(); - - if (region == null) { - return DEFAULT_PROTOCOL.toString() + "://s3.amazonaws.com/" + bucket + "/" + key; - } else { - final String endpoint = region.getServiceEndpoint("s3"); - return DEFAULT_PROTOCOL.toString() + "://" + endpoint + "/" + bucket + "/" + key; - } - } - - protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) { - final AccessControlList acl = new AccessControlList(); - - final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue(); - if (!isEmpty(ownerId)) { - final Owner owner = new Owner(); - owner.setId(ownerId); - acl.setOwner(owner); - } - - for (final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { - acl.grantPermission(grantee, Permission.FullControl); - } - - for (final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { - acl.grantPermission(grantee, Permission.Read); - } - - for (final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { - acl.grantPermission(grantee, Permission.Write); - } - - for (final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) { - acl.grantPermission(grantee, Permission.ReadAcp); - } - - for (final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) { - acl.grantPermission(grantee, Permission.WriteAcp); - } - - return acl; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.regions.Region; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CanonicalGrantee; +import com.amazonaws.services.s3.model.EmailAddressGrantee; +import com.amazonaws.services.s3.model.Grantee; +import com.amazonaws.services.s3.model.Owner; +import com.amazonaws.services.s3.model.Permission; + +public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3Client> { + + public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder() + .name("FullControl User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Full Control for an object") + .defaultValue("${s3.permissions.full.users}") + .build(); + public static final PropertyDescriptor READ_USER_LIST = new PropertyDescriptor.Builder() + .name("Read Permission User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Read Access for an object") + .defaultValue("${s3.permissions.read.users}") + .build(); + public static final PropertyDescriptor WRITE_USER_LIST = new PropertyDescriptor.Builder() + .name("Write Permission User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have Write Access for an object") + .defaultValue("${s3.permissions.write.users}") + .build(); + public static final PropertyDescriptor READ_ACL_LIST = new PropertyDescriptor.Builder() + .name("Read ACL User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to read the Access Control List for an object") + .defaultValue("${s3.permissions.readacl.users}") + .build(); + public static final PropertyDescriptor WRITE_ACL_LIST = new PropertyDescriptor.Builder() + .name("Write ACL User List") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("A comma-separated list of Amazon User ID's or E-mail addresses that specifies who should have permissions to change the Access Control List for an object") + .defaultValue("${s3.permissions.writeacl.users}") + .build(); + public static final PropertyDescriptor OWNER = new PropertyDescriptor.Builder() + .name("Owner") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .description("The Amazon ID to use for the object's owner") + .defaultValue("${s3.owner}") + .build(); + public static final PropertyDescriptor BUCKET = new PropertyDescriptor.Builder() + .name("Bucket") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() + .name("Object Key") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .defaultValue("${filename}") + .build(); + + @Override + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + final AmazonS3Client s3 = new AmazonS3Client(credentials, config); + + // if ENDPOINT_OVERRIDE is set, use PathStyleAccess + if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()).isEmpty() == false){ + final S3ClientOptions s3Options = new S3ClientOptions(); + s3Options.setPathStyleAccess(true); + s3.setS3ClientOptions(s3Options); + } + + return s3; + } + + protected Grantee createGrantee(final String value) { + if (isEmpty(value)) { + return null; + } + + if (value.contains("@")) { + return new EmailAddressGrantee(value); + } else { + return new CanonicalGrantee(value); + } + } + + protected final List<Grantee> createGrantees(final String value) { + if (isEmpty(value)) { + return Collections.emptyList(); + } + + final List<Grantee> grantees = new ArrayList<>(); + final String[] vals = value.split(","); + for (final String val : vals) { + final String identifier = val.trim(); + final Grantee grantee = createGrantee(identifier); + if (grantee != null) { + grantees.add(grantee); + } + } + return grantees; + } + + protected String getUrlForObject(final String bucket, final String key) { + Region region = getRegion(); + + if (region == null) { + return DEFAULT_PROTOCOL.toString() + "://s3.amazonaws.com/" + bucket + "/" + key; + } else { + final String endpoint = region.getServiceEndpoint("s3"); + return DEFAULT_PROTOCOL.toString() + "://" + endpoint + "/" + bucket + "/" + key; + } + } + + protected final AccessControlList createACL(final ProcessContext context, final FlowFile flowFile) { + final AccessControlList acl = new AccessControlList(); + + final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue(); + if (!isEmpty(ownerId)) { + final Owner owner = new Owner(); + owner.setId(ownerId); + acl.setOwner(owner); + } + + for (final Grantee grantee : createGrantees(context.getProperty(FULL_CONTROL_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.FullControl); + } + + for (final Grantee grantee : createGrantees(context.getProperty(READ_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.Read); + } + + for (final Grantee grantee : createGrantees(context.getProperty(WRITE_USER_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.Write); + } + + for (final Grantee grantee : createGrantees(context.getProperty(READ_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.ReadAcp); + } + + for (final Grantee grantee : createGrantees(context.getProperty(WRITE_ACL_LIST).evaluateAttributeExpressions(flowFile).getValue())) { + acl.grantPermission(grantee, Permission.WriteAcp); + } + + return acl; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index 9dc3595..ec6eea7 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -1,104 +1,104 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.s3; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.DeleteObjectRequest; -import com.amazonaws.services.s3.model.DeleteVersionRequest; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.util.StandardValidators; - - -@SupportsBatching -@SeeAlso({PutS3Object.class, FetchS3Object.class}) -@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"}) -@InputRequirement(Requirement.INPUT_REQUIRED) -@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " + - "If attempting to delete a file that does not exist, FlowFile is routed to success.") -public class DeleteS3Object extends AbstractS3Processor { - - public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() - .name("Version") - .description("The Version of the Object to delete") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .required(false) - .build(); - - public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, - SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final long startNanos = System.nanoTime(); - - final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); - final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); - - final AmazonS3 s3 = getClient(); - - // Deletes a key on Amazon S3 - try { - if (versionId == null) { - final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key); - // This call returns success if object doesn't exist - s3.deleteObject(r); - } else { - final DeleteVersionRequest r = new DeleteVersionRequest(bucket, key, versionId); - s3.deleteVersion(r); - } - } catch (final AmazonServiceException ase) { - getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - return; - } - - session.transfer(flowFile, REL_SUCCESS); - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.DeleteObjectRequest; +import com.amazonaws.services.s3.model.DeleteVersionRequest; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + + +@SupportsBatching +@SeeAlso({PutS3Object.class, FetchS3Object.class}) +@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Deletes FlowFiles on an Amazon S3 Bucket. " + + "If attempting to delete a file that does not exist, FlowFile is routed to success.") +public class DeleteS3Object extends AbstractS3Processor { + + public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() + .name("Version") + .description("The Version of the Object to delete") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID, + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, + SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); + + final AmazonS3 s3 = getClient(); + + // Deletes a key on Amazon S3 + try { + if (versionId == null) { + final DeleteObjectRequest r = new DeleteObjectRequest(bucket, key); + // This call returns success if object doesn't exist + s3.deleteObject(r); + } else { + final DeleteVersionRequest r = new DeleteVersionRequest(bucket, key, versionId); + s3.deleteVersion(r); + } + } catch (final AmazonServiceException ase) { + getLogger().error("Failed to delete S3 Object for {}; routing to failure", new Object[]{flowFile, ase}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + getLogger().info("Successfully delete S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 9eaf019..ae0af56 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -1,159 +1,159 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.s3; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.util.StandardValidators; - -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; - -@SupportsBatching -@SeeAlso({PutS3Object.class, DeleteS3Object.class}) -@InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) -@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") -@WritesAttributes({ - @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), - @WritesAttribute(attribute = "path", description = "The path of the file"), - @WritesAttribute(attribute = "absolute.path", description = "The path of the file"), - @WritesAttribute(attribute = "filename", description = "The name of the file"), - @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the file"), - @WritesAttribute(attribute = "hash.algorithm", description = "MD5"), - @WritesAttribute(attribute = "mime.type", description = "If S3 provides the content type/MIME type, this attribute will hold that file"), - @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), - @WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"), - @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"), - @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),}) -public class FetchS3Object extends AbstractS3Processor { - - public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() - .name("Version") - .description("The Version of the Object to download") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .required(false) - .build(); - - public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final long startNanos = System.nanoTime(); - final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); - final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); - - final AmazonS3 client = getClient(); - final GetObjectRequest request; - if (versionId == null) { - request = new GetObjectRequest(bucket, key); - } else { - request = new GetObjectRequest(bucket, key, versionId); - } - - final Map<String, String> attributes = new HashMap<>(); - try (final S3Object s3Object = client.getObject(request)) { - flowFile = session.importFrom(s3Object.getObjectContent(), flowFile); - attributes.put("s3.bucket", s3Object.getBucketName()); - - final ObjectMetadata metadata = s3Object.getObjectMetadata(); - if (metadata.getContentDisposition() != null) { - final String fullyQualified = metadata.getContentDisposition(); - final int lastSlash = fullyQualified.lastIndexOf("/"); - if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) { - attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash)); - attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified); - attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1)); - } else { - attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition()); - } - } - if (metadata.getContentMD5() != null) { - attributes.put("hash.value", metadata.getContentMD5()); - attributes.put("hash.algorithm", "MD5"); - } - if (metadata.getContentType() != null) { - attributes.put(CoreAttributes.MIME_TYPE.key(), metadata.getContentType()); - } - if (metadata.getETag() != null) { - attributes.put("s3.etag", metadata.getETag()); - } - if (metadata.getExpirationTime() != null) { - attributes.put("s3.expirationTime", String.valueOf(metadata.getExpirationTime().getTime())); - } - if (metadata.getExpirationTimeRuleId() != null) { - attributes.put("s3.expirationTimeRuleId", metadata.getExpirationTimeRuleId()); - } - if (metadata.getUserMetadata() != null) { - attributes.putAll(metadata.getUserMetadata()); - } - if (metadata.getVersionId() != null) { - attributes.put("s3.version", metadata.getVersionId()); - } - } catch (final IOException | AmazonClientException ioe) { - getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ioe}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - return; - } - - if (!attributes.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, attributes); - } - - session.transfer(flowFile, REL_SUCCESS); - final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); - session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; + +@SupportsBatching +@SeeAlso({PutS3Object.class, DeleteS3Object.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "S3", "AWS", "Get", "Fetch"}) +@CapabilityDescription("Retrieves the contents of an S3 Object and writes it to the content of a FlowFile") +@WritesAttributes({ + @WritesAttribute(attribute = "s3.bucket", description = "The name of the S3 bucket"), + @WritesAttribute(attribute = "path", description = "The path of the file"), + @WritesAttribute(attribute = "absolute.path", description = "The path of the file"), + @WritesAttribute(attribute = "filename", description = "The name of the file"), + @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the file"), + @WritesAttribute(attribute = "hash.algorithm", description = "MD5"), + @WritesAttribute(attribute = "mime.type", description = "If S3 provides the content type/MIME type, this attribute will hold that file"), + @WritesAttribute(attribute = "s3.etag", description = "The ETag that can be used to see if the file has changed"), + @WritesAttribute(attribute = "s3.expirationTime", description = "If the file has an expiration date, this attribute will be set, containing the milliseconds since epoch in UTC time"), + @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The ID of the rule that dictates this object's expiration time"), + @WritesAttribute(attribute = "s3.version", description = "The version of the S3 object"),}) +public class FetchS3Object extends AbstractS3Processor { + + public static final PropertyDescriptor VERSION_ID = new PropertyDescriptor.Builder() + .name("Version") + .description("The Version of the Object to download") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); + + final AmazonS3 client = getClient(); + final GetObjectRequest request; + if (versionId == null) { + request = new GetObjectRequest(bucket, key); + } else { + request = new GetObjectRequest(bucket, key, versionId); + } + + final Map<String, String> attributes = new HashMap<>(); + try (final S3Object s3Object = client.getObject(request)) { + flowFile = session.importFrom(s3Object.getObjectContent(), flowFile); + attributes.put("s3.bucket", s3Object.getBucketName()); + + final ObjectMetadata metadata = s3Object.getObjectMetadata(); + if (metadata.getContentDisposition() != null) { + final String fullyQualified = metadata.getContentDisposition(); + final int lastSlash = fullyQualified.lastIndexOf("/"); + if (lastSlash > -1 && lastSlash < fullyQualified.length() - 1) { + attributes.put(CoreAttributes.PATH.key(), fullyQualified.substring(0, lastSlash)); + attributes.put(CoreAttributes.ABSOLUTE_PATH.key(), fullyQualified); + attributes.put(CoreAttributes.FILENAME.key(), fullyQualified.substring(lastSlash + 1)); + } else { + attributes.put(CoreAttributes.FILENAME.key(), metadata.getContentDisposition()); + } + } + if (metadata.getContentMD5() != null) { + attributes.put("hash.value", metadata.getContentMD5()); + attributes.put("hash.algorithm", "MD5"); + } + if (metadata.getContentType() != null) { + attributes.put(CoreAttributes.MIME_TYPE.key(), metadata.getContentType()); + } + if (metadata.getETag() != null) { + attributes.put("s3.etag", metadata.getETag()); + } + if (metadata.getExpirationTime() != null) { + attributes.put("s3.expirationTime", String.valueOf(metadata.getExpirationTime().getTime())); + } + if (metadata.getExpirationTimeRuleId() != null) { + attributes.put("s3.expirationTimeRuleId", metadata.getExpirationTimeRuleId()); + } + if (metadata.getUserMetadata() != null) { + attributes.putAll(metadata.getUserMetadata()); + } + if (metadata.getVersionId() != null) { + attributes.put("s3.version", metadata.getVersionId()); + } + } catch (final IOException | AmazonClientException ioe) { + getLogger().error("Failed to retrieve S3 Object for {}; routing to failure", new Object[]{flowFile, ioe}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + return; + } + + if (!attributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + + session.transfer(flowFile, REL_SUCCESS); + final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + getLogger().info("Successfully retrieved S3 Object for {} in {} millis; routing to success", new Object[]{flowFile, transferMillis}); + session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); + } + +}
