[
https://issues.apache.org/jira/browse/NIFI-515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074695#comment-16074695
]
ASF GitHub Bot commented on NIFI-515:
-------------------------------------
Github user jzonthemtn commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1977#discussion_r125632070
--- Diff:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
---
@@ -108,43 +108,57 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
final String queueUrl =
context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
request.setQueueUrl(queueUrl);
+ final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ final List<FlowFile> flowFiles = session.get(new
UrlFlowFileFilter(batchSize, queueUrl, context));
+ flowFiles.add(flowFile);
+
final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
- final SendMessageBatchRequestEntry entry = new
SendMessageBatchRequestEntry();
- entry.setId(flowFile.getAttribute("uuid"));
- final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- session.exportTo(flowFile, baos);
- final String flowFileContent = baos.toString();
- entry.setMessageBody(flowFileContent);
+ for(FlowFile flowFileItem : flowFiles) {
- final Map<String, MessageAttributeValue> messageAttributes = new
HashMap<>();
+ final SendMessageBatchRequestEntry entry = new
SendMessageBatchRequestEntry();
+ entry.setId(flowFileItem.getAttribute("uuid"));
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ session.exportTo(flowFileItem, baos);
+ final String flowFileContent = baos.toString();
+ entry.setMessageBody(flowFileContent);
- for (final PropertyDescriptor descriptor : userDefinedProperties) {
- final MessageAttributeValue mav = new MessageAttributeValue();
- mav.setDataType("String");
-
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
- messageAttributes.put(descriptor.getName(), mav);
- }
+ final Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>();
- entry.setMessageAttributes(messageAttributes);
-
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
- entries.add(entry);
+ for (final PropertyDescriptor descriptor :
userDefinedProperties) {
+ final MessageAttributeValue mav = new
MessageAttributeValue();
+ mav.setDataType("String");
+
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFileItem).getValue());
+ messageAttributes.put(descriptor.getName(), mav);
+ }
+
+ entry.setMessageAttributes(messageAttributes);
+
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
+ entries.add(entry);
+
+ }
request.setEntries(entries);
try {
client.sendMessageBatch(request);
} catch (final Exception e) {
- getLogger().error("Failed to send messages to Amazon SQS due
to {}; routing to failure", new Object[]{e});
- flowFile = session.penalize(flowFile);
- session.transfer(flowFile, REL_FAILURE);
+ getLogger().error("Failed to send {} messages to Amazon SQS
due to {}; routing to failure", new Object[]{flowFiles.size(), e});
--- End diff --
There could be a mix of successful/failed messages in the
`SendMessageBatchResult`. Does that impact how the session is transferred?
> DeleteSQS and PutSQS should offer batch processing
> --------------------------------------------------
>
> Key: NIFI-515
> URL: https://issues.apache.org/jira/browse/NIFI-515
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Reporter: Mark Payne
> Assignee: Pierre Villard
> Priority: Minor
>
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)