http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 3f88a74..6755c13 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -1,214 +1,214 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.s3; - -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.ReadsAttribute; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; - -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.AccessControlList; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.PutObjectResult; -import com.amazonaws.services.s3.model.StorageClass; - -@SupportsBatching -@SeeAlso({FetchS3Object.class, DeleteS3Object.class}) -@InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) -@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") -@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object", - description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true) -@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object") -@WritesAttributes({ - @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"), - @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"), - @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), - @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), - @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set"), - @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), - @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of the S3 object, if any was set") -}) -public class PutS3Object extends AbstractS3Processor { - - public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() - .name("Expiration Time Rule") - .required(false) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() - .name("Storage Class") - .required(true) - .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) - .defaultValue(StorageClass.Standard.name()) - .build(); - - public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, - FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); - - final static String S3_BUCKET_KEY = "s3.bucket"; - final static String S3_OBJECT_KEY = "s3.key"; - final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId"; - final static String S3_VERSION_ATTR_KEY = "s3.version"; - final static String S3_ETAG_ATTR_KEY = "s3.etag"; - final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration"; - final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass"; - final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class"; - final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata"; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .dynamic(true) - .build(); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final long startNanos = System.nanoTime(); - - final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); - final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - - final AmazonS3Client s3 = getClient(); - final FlowFile ff = flowFile; - final Map<String, String> attributes = new HashMap<>(); - attributes.put(S3_BUCKET_KEY, bucket); - attributes.put(S3_OBJECT_KEY, key); - - try { - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - try (final InputStream in = new BufferedInputStream(rawIn)) { - final ObjectMetadata objectMetadata = new ObjectMetadata(); - objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key())); - objectMetadata.setContentLength(ff.getSize()); - - final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue(); - if (expirationRule != null) { - objectMetadata.setExpirationTimeRuleId(expirationRule); - } - - final Map<String, String> userMetadata = new HashMap<>(); - for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { - if (entry.getKey().isDynamic()) { - final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue(); - userMetadata.put(entry.getKey().getName(), value); - } - } - - if (!userMetadata.isEmpty()) { - objectMetadata.setUserMetadata(userMetadata); - } - - final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); - request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); - final AccessControlList acl = createACL(context, ff); - if (acl != null) { - request.setAccessControlList(acl); - } - - final PutObjectResult result = s3.putObject(request); - if (result.getVersionId() != null) { - attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId()); - } - - attributes.put(S3_ETAG_ATTR_KEY, result.getETag()); - - final Date expiration = result.getExpirationTime(); - if (expiration != null) { - attributes.put(S3_EXPIRATION_ATTR_KEY, expiration.toString()); - } - if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) { - attributes.put(S3_STORAGECLASS_ATTR_KEY, - result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString()); - } - if (userMetadata.size() > 0) { - List<String> pairs = new ArrayList<String>(); - for (String userKey : userMetadata.keySet()) { - pairs.add(userKey + "=" + userMetadata.get(userKey)); - } - attributes.put(S3_USERMETA_ATTR_KEY, StringUtils.join(pairs, ", ")); - } - } - } - }); - - if (!attributes.isEmpty()) { - flowFile = session.putAllAttributes(flowFile, attributes); - } - session.transfer(flowFile, REL_SUCCESS); - - final String url = s3.getResourceUrl(bucket, key); - final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().send(flowFile, url, millis); - - getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis}); - } catch (final ProcessException | AmazonClientException pe) { - getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.s3; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.StorageClass; + +@SupportsBatching +@SeeAlso({FetchS3Object.class, DeleteS3Object.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "S3", "AWS", "Archive", "Put"}) +@CapabilityDescription("Puts FlowFiles to an Amazon S3 Bucket") +@DynamicProperty(name = "The name of a User-Defined Metadata field to add to the S3 Object", value = "The value of a User-Defined Metadata field to add to the S3 Object", + description = "Allows user-defined metadata to be added to the S3 object as key/value pairs", supportsExpressionLanguage = true) +@ReadsAttribute(attribute = "filename", description = "Uses the FlowFile's filename as the filename for the S3 object") +@WritesAttributes({ + @WritesAttribute(attribute = "s3.bucket", description = "The S3 bucket where the Object was put in S3"), + @WritesAttribute(attribute = "s3.key", description = "The S3 key within where the Object was put in S3"), + @WritesAttribute(attribute = "s3.version", description = "The version of the S3 Object that was put to S3"), + @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 Object"), + @WritesAttribute(attribute = "s3.expiration", description = "A human-readable form of the expiration date of the S3 object, if one is set"), + @WritesAttribute(attribute = "s3.uploadId", description = "The uploadId used to upload the Object to S3"), + @WritesAttribute(attribute = "s3.usermetadata", description = "A human-readable form of the User Metadata of the S3 object, if any was set") +}) +public class PutS3Object extends AbstractS3Processor { + + public static final PropertyDescriptor EXPIRATION_RULE_ID = new PropertyDescriptor.Builder() + .name("Expiration Time Rule") + .required(false) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor STORAGE_CLASS = new PropertyDescriptor.Builder() + .name("Storage Class") + .required(true) + .allowableValues(StorageClass.Standard.name(), StorageClass.ReducedRedundancy.name()) + .defaultValue(StorageClass.Standard.name()) + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, + FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); + + final static String S3_BUCKET_KEY = "s3.bucket"; + final static String S3_OBJECT_KEY = "s3.key"; + final static String S3_UPLOAD_ID_ATTR_KEY = "s3.uploadId"; + final static String S3_VERSION_ATTR_KEY = "s3.version"; + final static String S3_ETAG_ATTR_KEY = "s3.etag"; + final static String S3_EXPIRATION_ATTR_KEY = "s3.expiration"; + final static String S3_STORAGECLASS_ATTR_KEY = "s3.storeClass"; + final static String S3_STORAGECLASS_META_KEY = "x-amz-storage-class"; + final static String S3_USERMETA_ATTR_KEY = "s3.usermetadata"; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final long startNanos = System.nanoTime(); + + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + + final AmazonS3Client s3 = getClient(); + final FlowFile ff = flowFile; + final Map<String, String> attributes = new HashMap<>(); + attributes.put(S3_BUCKET_KEY, bucket); + attributes.put(S3_OBJECT_KEY, key); + + try { + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream rawIn) throws IOException { + try (final InputStream in = new BufferedInputStream(rawIn)) { + final ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentDisposition(ff.getAttribute(CoreAttributes.FILENAME.key())); + objectMetadata.setContentLength(ff.getSize()); + + final String expirationRule = context.getProperty(EXPIRATION_RULE_ID).evaluateAttributeExpressions(ff).getValue(); + if (expirationRule != null) { + objectMetadata.setExpirationTimeRuleId(expirationRule); + } + + final Map<String, String> userMetadata = new HashMap<>(); + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(ff).getValue(); + userMetadata.put(entry.getKey().getName(), value); + } + } + + if (!userMetadata.isEmpty()) { + objectMetadata.setUserMetadata(userMetadata); + } + + final PutObjectRequest request = new PutObjectRequest(bucket, key, in, objectMetadata); + request.setStorageClass(StorageClass.valueOf(context.getProperty(STORAGE_CLASS).getValue())); + final AccessControlList acl = createACL(context, ff); + if (acl != null) { + request.setAccessControlList(acl); + } + + final PutObjectResult result = s3.putObject(request); + if (result.getVersionId() != null) { + attributes.put(S3_VERSION_ATTR_KEY, result.getVersionId()); + } + + attributes.put(S3_ETAG_ATTR_KEY, result.getETag()); + + final Date expiration = result.getExpirationTime(); + if (expiration != null) { + attributes.put(S3_EXPIRATION_ATTR_KEY, expiration.toString()); + } + if (result.getMetadata().getRawMetadata().keySet().contains(S3_STORAGECLASS_META_KEY)) { + attributes.put(S3_STORAGECLASS_ATTR_KEY, + result.getMetadata().getRawMetadataValue(S3_STORAGECLASS_META_KEY).toString()); + } + if (userMetadata.size() > 0) { + List<String> pairs = new ArrayList<String>(); + for (String userKey : userMetadata.keySet()) { + pairs.add(userKey + "=" + userMetadata.get(userKey)); + } + attributes.put(S3_USERMETA_ATTR_KEY, StringUtils.join(pairs, ", ")); + } + } + } + }); + + if (!attributes.isEmpty()) { + flowFile = session.putAllAttributes(flowFile, attributes); + } + session.transfer(flowFile, REL_SUCCESS); + + final String url = s3.getResourceUrl(bucket, key); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + session.getProvenanceReporter().send(flowFile, url, millis); + + getLogger().info("Successfully put {} to Amazon S3 in {} milliseconds", new Object[] {ff, millis}); + } catch (final ProcessException | AmazonClientException pe) { + getLogger().error("Failed to put {} to Amazon S3 due to {}", new Object[] {flowFile, pe}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java index 5b57647..109c92f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java @@ -1,58 +1,58 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.sns; - -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.services.sns.AmazonSNSClient; - -public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> { - - protected static final AllowableValue ARN_TYPE_TOPIC - = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic"); - protected static final AllowableValue ARN_TYPE_TARGET - = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber"); - - public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder() - .name("Amazon Resource Name (ARN)") - .description("The name of the resource to which notifications should be published") - .expressionLanguageSupported(true) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder() - .name("ARN Type") - .description("The type of Amazon Resource Name that is being used.") - .expressionLanguageSupported(false) - .required(true) - .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET) - .defaultValue(ARN_TYPE_TOPIC.getValue()) - .build(); - - @Override - protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { - return new AmazonSNSClient(credentials, config); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sns; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.sns.AmazonSNSClient; + +public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> { + + protected static final AllowableValue ARN_TYPE_TOPIC + = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic"); + protected static final AllowableValue ARN_TYPE_TARGET + = new AllowableValue("Target ARN", "Target ARN", "The ARN is the name of a particular Target, used to notify a specific subscriber"); + + public static final PropertyDescriptor ARN = new PropertyDescriptor.Builder() + .name("Amazon Resource Name (ARN)") + .description("The name of the resource to which notifications should be published") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor ARN_TYPE = new PropertyDescriptor.Builder() + .name("ARN Type") + .description("The type of Amazon Resource Name that is being used.") + .expressionLanguageSupported(false) + .required(true) + .allowableValues(ARN_TYPE_TOPIC, ARN_TYPE_TARGET) + .defaultValue(ARN_TYPE_TOPIC.getValue()) + .build(); + + @Override + protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + return new AmazonSNSClient(credentials, config); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java index b8c5830..f58babc 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -1,159 +1,159 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.sns; - -import java.io.ByteArrayOutputStream; -import java.nio.charset.Charset; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.sqs.GetSQS; -import org.apache.nifi.processors.aws.sqs.PutSQS; - -import com.amazonaws.services.sns.AmazonSNSClient; -import com.amazonaws.services.sns.model.MessageAttributeValue; -import com.amazonaws.services.sns.model.PublishRequest; - -@SupportsBatching -@SeeAlso({GetSQS.class, PutSQS.class}) -@InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"}) -@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service") -public class PutSNS extends AbstractSNSProcessor { - - public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder() - .name("Character Set") - .description("The character set in which the FlowFile's content is encoded") - .defaultValue("UTF-8") - .expressionLanguageSupported(true) - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .required(true) - .build(); - public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder() - .name("Use JSON Structure") - .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'." - + " Additional elements can be used to send different messages to different protocols. See the Amazon" - + " SNS Documentation for more information.") - .defaultValue("false") - .allowableValues("true", "false") - .required(true) - .build(); - public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder() - .name("E-mail Subject") - .description("The optional subject to use for any subscribers that are subscribed via E-mail") - .expressionLanguageSupported(true) - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, - USE_JSON_STRUCTURE, CHARACTER_ENCODING)); - - public static final int MAX_SIZE = 256 * 1024; - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .required(false) - .dynamic(true) - .build(); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - if (flowFile.getSize() > MAX_SIZE) { - getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue()); - - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - session.exportTo(flowFile, baos); - final String message = new String(baos.toByteArray(), charset); - - final AmazonSNSClient client = getClient(); - final PublishRequest request = new PublishRequest(); - request.setMessage(message); - - if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) { - request.setMessageStructure("json"); - } - - final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue(); - final String arnType = context.getProperty(ARN_TYPE).getValue(); - if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) { - request.setTopicArn(arn); - } else { - request.setTargetArn(arn); - } - - final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue(); - if (subject != null) { - request.setSubject(subject); - } - - for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { - if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) { - final MessageAttributeValue value = new MessageAttributeValue(); - value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue()); - value.setDataType("String"); - request.addMessageAttributesEntry(entry.getKey().getName(), value); - } - } - - try { - client.publish(request); - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().send(flowFile, arn); - getLogger().info("Successfully published notification for {}", new Object[]{flowFile}); - } catch (final Exception e) { - getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e}); - flowFile = session.penalize(flowFile); - session.transfer(flowFile, REL_FAILURE); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sns; + +import java.io.ByteArrayOutputStream; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.sqs.GetSQS; +import org.apache.nifi.processors.aws.sqs.PutSQS; + +import com.amazonaws.services.sns.AmazonSNSClient; +import com.amazonaws.services.sns.model.MessageAttributeValue; +import com.amazonaws.services.sns.model.PublishRequest; + +@SupportsBatching +@SeeAlso({GetSQS.class, PutSQS.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "sns", "topic", "put", "publish", "pubsub"}) +@CapabilityDescription("Sends the content of a FlowFile as a notification to the Amazon Simple Notification Service") +public class PutSNS extends AbstractSNSProcessor { + + public static final PropertyDescriptor CHARACTER_ENCODING = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The character set in which the FlowFile's content is encoded") + .defaultValue("UTF-8") + .expressionLanguageSupported(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .required(true) + .build(); + public static final PropertyDescriptor USE_JSON_STRUCTURE = new PropertyDescriptor.Builder() + .name("Use JSON Structure") + .description("If true, the contents of the FlowFile must be JSON with a top-level element named 'default'." + + " Additional elements can be used to send different messages to different protocols. See the Amazon" + + " SNS Documentation for more information.") + .defaultValue("false") + .allowableValues("true", "false") + .required(true) + .build(); + public static final PropertyDescriptor SUBJECT = new PropertyDescriptor.Builder() + .name("E-mail Subject") + .description("The optional subject to use for any subscribers that are subscribed via E-mail") + .expressionLanguageSupported(true) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, + USE_JSON_STRUCTURE, CHARACTER_ENCODING)); + + public static final int MAX_SIZE = 256 * 1024; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .dynamic(true) + .build(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + if (flowFile.getSize() > MAX_SIZE) { + getLogger().error("Cannot publish {} to SNS because its size exceeds Amazon SNS's limit of 256KB; routing to failure", new Object[]{flowFile}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_ENCODING).evaluateAttributeExpressions(flowFile).getValue()); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + final String message = new String(baos.toByteArray(), charset); + + final AmazonSNSClient client = getClient(); + final PublishRequest request = new PublishRequest(); + request.setMessage(message); + + if (context.getProperty(USE_JSON_STRUCTURE).asBoolean()) { + request.setMessageStructure("json"); + } + + final String arn = context.getProperty(ARN).evaluateAttributeExpressions(flowFile).getValue(); + final String arnType = context.getProperty(ARN_TYPE).getValue(); + if (arnType.equalsIgnoreCase(ARN_TYPE_TOPIC.getValue())) { + request.setTopicArn(arn); + } else { + request.setTargetArn(arn); + } + + final String subject = context.getProperty(SUBJECT).evaluateAttributeExpressions(flowFile).getValue(); + if (subject != null) { + request.setSubject(subject); + } + + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) { + final MessageAttributeValue value = new MessageAttributeValue(); + value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue()); + value.setDataType("String"); + request.addMessageAttributesEntry(entry.getKey().getName(), value); + } + } + + try { + client.publish(request); + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().send(flowFile, arn); + getLogger().info("Successfully published notification for {}", new Object[]{flowFile}); + } catch (final Exception e) { + getLogger().error("Failed to publish Amazon SNS message for {} due to {}", new Object[]{flowFile, e}); + flowFile = session.penalize(flowFile); + session.transfer(flowFile, REL_FAILURE); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java index 3cee02d..bf8058f 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java @@ -1,51 +1,51 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.sqs; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; - -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.services.sqs.AmazonSQSClient; - -public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> { - - public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The maximum number of messages to send in a single network request") - .required(true) - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("25") - .build(); - - public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder() - .name("Queue URL") - .description("The URL of the queue to act upon") - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .expressionLanguageSupported(true) - .required(true) - .build(); - - @Override - protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { - return new AmazonSQSClient(credentials, config); - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.sqs.AmazonSQSClient; + +public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> { + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of messages to send in a single network request") + .required(true) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("25") + .build(); + + public static final PropertyDescriptor QUEUE_URL = new PropertyDescriptor.Builder() + .name("Queue URL") + .description("The URL of the queue to act upon") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + + @Override + protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + return new AmazonSQSClient(credentials, config); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java index bd0dfa5..73e3715 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java @@ -1,102 +1,102 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.sqs; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.util.StandardValidators; - -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; - -@SupportsBatching -@SeeAlso({GetSQS.class, PutSQS.class}) -@InputRequirement(Requirement.INPUT_REQUIRED) -@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"}) -@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue") -public class DeleteSQS extends AbstractSQSProcessor { - - public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder() - .name("Receipt Handle") - .description("The identifier that specifies the receipt of the message") - .expressionLanguageSupported(true) - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("${sqs.receipt.handle}") - .build(); - - public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT)); - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - List<FlowFile> flowFiles = session.get(1); - if (flowFiles.isEmpty()) { - return; - } - - final FlowFile firstFlowFile = flowFiles.get(0); - final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue(); - - final AmazonSQSClient client = getClient(); - final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(); - request.setQueueUrl(queueUrl); - - final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size()); - - for (final FlowFile flowFile : flowFiles) { - final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); - entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue()); - entries.add(entry); - } - - request.setEntries(entries); - - try { - client.deleteMessageBatch(request); - getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()}); - session.transfer(flowFiles, REL_SUCCESS); - } catch (final Exception e) { - getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e}); - final List<FlowFile> penalizedFlowFiles = new ArrayList<>(); - for (final FlowFile flowFile : flowFiles) { - penalizedFlowFiles.add(session.penalize(flowFile)); - } - session.transfer(penalizedFlowFiles, REL_FAILURE); - } - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; + +@SupportsBatching +@SeeAlso({GetSQS.class, PutSQS.class}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"Amazon", "AWS", "SQS", "Queue", "Delete"}) +@CapabilityDescription("Deletes a message from an Amazon Simple Queuing Service Queue") +public class DeleteSQS extends AbstractSQSProcessor { + + public static final PropertyDescriptor RECEIPT_HANDLE = new PropertyDescriptor.Builder() + .name("Receipt Handle") + .description("The identifier that specifies the receipt of the message") + .expressionLanguageSupported(true) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("${sqs.receipt.handle}") + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT)); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + List<FlowFile> flowFiles = session.get(1); + if (flowFiles.isEmpty()) { + return; + } + + final FlowFile firstFlowFile = flowFiles.get(0); + final String queueUrl = context.getProperty(QUEUE_URL).evaluateAttributeExpressions(firstFlowFile).getValue(); + + final AmazonSQSClient client = getClient(); + final DeleteMessageBatchRequest request = new DeleteMessageBatchRequest(); + request.setQueueUrl(queueUrl); + + final List<DeleteMessageBatchRequestEntry> entries = new ArrayList<>(flowFiles.size()); + + for (final FlowFile flowFile : flowFiles) { + final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); + entry.setReceiptHandle(context.getProperty(RECEIPT_HANDLE).evaluateAttributeExpressions(flowFile).getValue()); + entries.add(entry); + } + + request.setEntries(entries); + + try { + client.deleteMessageBatch(request); + getLogger().info("Successfully deleted {} objects from SQS", new Object[]{flowFiles.size()}); + session.transfer(flowFiles, REL_SUCCESS); + } catch (final Exception e) { + getLogger().error("Failed to delete {} objects from SQS due to {}", new Object[]{flowFiles.size(), e}); + final List<FlowFile> penalizedFlowFiles = new ArrayList<>(); + for (final FlowFile flowFile : flowFiles) { + penalizedFlowFiles.add(session.penalize(flowFile)); + } + session.transfer(penalizedFlowFiles, REL_FAILURE); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/3a7ddc6a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java index f395e67..91166a2 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -1,219 +1,219 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.aws.sqs; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.apache.nifi.annotation.behavior.InputRequirement; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; -import org.apache.nifi.annotation.behavior.SupportsBatching; -import org.apache.nifi.annotation.behavior.WritesAttribute; -import org.apache.nifi.annotation.behavior.WritesAttributes; -import org.apache.nifi.annotation.documentation.CapabilityDescription; -import org.apache.nifi.annotation.documentation.SeeAlso; -import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.processor.util.StandardValidators; - -import com.amazonaws.services.sqs.AmazonSQSClient; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; -import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.MessageAttributeValue; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; - -@SupportsBatching -@SeeAlso({ PutSQS.class, DeleteSQS.class }) -@InputRequirement(Requirement.INPUT_FORBIDDEN) -@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"}) -@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue") -@WritesAttributes({ - @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"), - @WritesAttribute(attribute = "hash.algorithm", description = "MD5"), - @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"), - @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue") -}) -public class GetSQS extends AbstractSQSProcessor { - - public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() - .name("Character Set") - .description("The Character Set that should be used to encode the textual content of the SQS message") - .required(true) - .defaultValue("UTF-8") - .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0])) - .build(); - - public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() - .name("Auto Delete Messages") - .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.") - .required(true) - .allowableValues("true", "false") - .defaultValue("true") - .build(); - - public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder() - .name("Visibility Timeout") - .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers") - .expressionLanguageSupported(false) - .required(true) - .defaultValue("15 mins") - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - - public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() - .name("Batch Size") - .description("The maximum number of messages to send in a single network request") - .required(true) - .addValidator(StandardValidators.createLongValidator(1L, 10L, true)) - .defaultValue("10") - .build(); - - public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(QUEUE_URL) - .expressionLanguageSupported(false) - .build(); - - public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new PropertyDescriptor.Builder() - .name("Receive Message Wait Time") - .description("The maximum amount of time to wait on a long polling receive call. Setting this to a value of 1 second or greater will " - + "reduce the number of SQS requests and decrease fetch latency at the cost of a constantly active thread.") - .expressionLanguageSupported(false) - .required(true) - .defaultValue("0 sec") - .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.SECONDS, 20, TimeUnit.SECONDS)) // 20 seconds is the maximum allowed by SQS - .build(); - - public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME)); - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; - } - - @Override - public Set<Relationship> getRelationships() { - return Collections.singleton(REL_SUCCESS); - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue(); - - final AmazonSQSClient client = getClient(); - - final ReceiveMessageRequest request = new ReceiveMessageRequest(); - request.setAttributeNames(Collections.singleton("All")); - request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()); - request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()); - request.setQueueUrl(queueUrl); - request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue()); - - final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); - - final ReceiveMessageResult result; - try { - result = client.receiveMessage(request); - } catch (final Exception e) { - getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e}); - context.yield(); - return; - } - - final List<Message> messages = result.getMessages(); - if (messages.isEmpty()) { - context.yield(); - return; - } - - final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean(); - - for (final Message message : messages) { - FlowFile flowFile = session.create(); - - final Map<String, String> attributes = new HashMap<>(); - for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) { - attributes.put("sqs." + entry.getKey(), entry.getValue()); - } - - for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) { - attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue()); - } - - attributes.put("hash.value", message.getMD5OfBody()); - attributes.put("hash.algorithm", "md5"); - attributes.put("sqs.message.id", message.getMessageId()); - attributes.put("sqs.receipt.handle", message.getReceiptHandle()); - - flowFile = session.putAllAttributes(flowFile, attributes); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - out.write(message.getBody().getBytes(charset)); - } - }); - - session.transfer(flowFile, REL_SUCCESS); - session.getProvenanceReporter().receive(flowFile, queueUrl); - - getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile}); - } - - if (autoDelete) { - // If we want to auto-delete messages, we must fist commit the session to ensure that the data - // is persisted in NiFi's repositories. - session.commit(); - - final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest(); - deleteRequest.setQueueUrl(queueUrl); - final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>(); - for (final Message message : messages) { - final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); - entry.setId(message.getMessageId()); - entry.setReceiptHandle(message.getReceiptHandle()); - deleteRequestEntries.add(entry); - } - - deleteRequest.setEntries(deleteRequestEntries); - - try { - client.deleteMessageBatch(deleteRequest); - } catch (final Exception e) { - getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages" - + " may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e}); - } - } - - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.sqs; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.services.sqs.AmazonSQSClient; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; + +@SupportsBatching +@SeeAlso({ PutSQS.class, DeleteSQS.class }) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"Amazon", "AWS", "SQS", "Queue", "Get", "Fetch", "Poll"}) +@CapabilityDescription("Fetches messages from an Amazon Simple Queuing Service Queue") +@WritesAttributes({ + @WritesAttribute(attribute = "hash.value", description = "The MD5 sum of the message"), + @WritesAttribute(attribute = "hash.algorithm", description = "MD5"), + @WritesAttribute(attribute = "sqs.message.id", description = "The unique identifier of the SQS message"), + @WritesAttribute(attribute = "sqs.receipt.handle", description = "The SQS Receipt Handle that is to be used to delete the message from the queue") +}) +public class GetSQS extends AbstractSQSProcessor { + + public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The Character Set that should be used to encode the textual content of the SQS message") + .required(true) + .defaultValue("UTF-8") + .allowableValues(Charset.availableCharsets().keySet().toArray(new String[0])) + .build(); + + public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() + .name("Auto Delete Messages") + .description("Specifies whether the messages should be automatically deleted by the processors once they have been received.") + .required(true) + .allowableValues("true", "false") + .defaultValue("true") + .build(); + + public static final PropertyDescriptor VISIBILITY_TIMEOUT = new PropertyDescriptor.Builder() + .name("Visibility Timeout") + .description("The amount of time after a message is received but not deleted that the message is hidden from other consumers") + .expressionLanguageSupported(false) + .required(true) + .defaultValue("15 mins") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("The maximum number of messages to send in a single network request") + .required(true) + .addValidator(StandardValidators.createLongValidator(1L, 10L, true)) + .defaultValue("10") + .build(); + + public static final PropertyDescriptor STATIC_QUEUE_URL = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(QUEUE_URL) + .expressionLanguageSupported(false) + .build(); + + public static final PropertyDescriptor RECEIVE_MSG_WAIT_TIME = new PropertyDescriptor.Builder() + .name("Receive Message Wait Time") + .description("The maximum amount of time to wait on a long polling receive call. Setting this to a value of 1 second or greater will " + + "reduce the number of SQS requests and decrease fetch latency at the cost of a constantly active thread.") + .expressionLanguageSupported(false) + .required(true) + .defaultValue("0 sec") + .addValidator(StandardValidators.createTimePeriodValidator(0, TimeUnit.SECONDS, 20, TimeUnit.SECONDS)) // 20 seconds is the maximum allowed by SQS + .build(); + + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( + Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME)); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final String queueUrl = context.getProperty(STATIC_QUEUE_URL).getValue(); + + final AmazonSQSClient client = getClient(); + + final ReceiveMessageRequest request = new ReceiveMessageRequest(); + request.setAttributeNames(Collections.singleton("All")); + request.setMaxNumberOfMessages(context.getProperty(BATCH_SIZE).asInteger()); + request.setVisibilityTimeout(context.getProperty(VISIBILITY_TIMEOUT).asTimePeriod(TimeUnit.SECONDS).intValue()); + request.setQueueUrl(queueUrl); + request.setWaitTimeSeconds(context.getProperty(RECEIVE_MSG_WAIT_TIME).asTimePeriod(TimeUnit.SECONDS).intValue()); + + final Charset charset = Charset.forName(context.getProperty(CHARSET).getValue()); + + final ReceiveMessageResult result; + try { + result = client.receiveMessage(request); + } catch (final Exception e) { + getLogger().error("Failed to receive messages from Amazon SQS due to {}", new Object[]{e}); + context.yield(); + return; + } + + final List<Message> messages = result.getMessages(); + if (messages.isEmpty()) { + context.yield(); + return; + } + + final boolean autoDelete = context.getProperty(AUTO_DELETE).asBoolean(); + + for (final Message message : messages) { + FlowFile flowFile = session.create(); + + final Map<String, String> attributes = new HashMap<>(); + for (final Map.Entry<String, String> entry : message.getAttributes().entrySet()) { + attributes.put("sqs." + entry.getKey(), entry.getValue()); + } + + for (final Map.Entry<String, MessageAttributeValue> entry : message.getMessageAttributes().entrySet()) { + attributes.put("sqs." + entry.getKey(), entry.getValue().getStringValue()); + } + + attributes.put("hash.value", message.getMD5OfBody()); + attributes.put("hash.algorithm", "md5"); + attributes.put("sqs.message.id", message.getMessageId()); + attributes.put("sqs.receipt.handle", message.getReceiptHandle()); + + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + out.write(message.getBody().getBytes(charset)); + } + }); + + session.transfer(flowFile, REL_SUCCESS); + session.getProvenanceReporter().receive(flowFile, queueUrl); + + getLogger().info("Successfully received {} from Amazon SQS", new Object[]{flowFile}); + } + + if (autoDelete) { + // If we want to auto-delete messages, we must fist commit the session to ensure that the data + // is persisted in NiFi's repositories. + session.commit(); + + final DeleteMessageBatchRequest deleteRequest = new DeleteMessageBatchRequest(); + deleteRequest.setQueueUrl(queueUrl); + final List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>(); + for (final Message message : messages) { + final DeleteMessageBatchRequestEntry entry = new DeleteMessageBatchRequestEntry(); + entry.setId(message.getMessageId()); + entry.setReceiptHandle(message.getReceiptHandle()); + deleteRequestEntries.add(entry); + } + + deleteRequest.setEntries(deleteRequestEntries); + + try { + client.deleteMessageBatch(deleteRequest); + } catch (final Exception e) { + getLogger().error("Received {} messages from Amazon SQS but failed to delete the messages; these messages" + + " may be duplicated. Reason for deletion failure: {}", new Object[]{messages.size(), e}); + } + } + + } + +}
