[ 
https://issues.apache.org/jira/browse/NIFI-515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16074700#comment-16074700
 ] 

ASF GitHub Bot commented on NIFI-515:
-------------------------------------

Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1977#discussion_r125633660
  
    --- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
 ---
    @@ -108,43 +108,57 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
             final String queueUrl = 
context.getProperty(QUEUE_URL).evaluateAttributeExpressions(flowFile).getValue();
             request.setQueueUrl(queueUrl);
     
    +        final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
    +        final List<FlowFile> flowFiles = session.get(new 
UrlFlowFileFilter(batchSize, queueUrl, context));
    +        flowFiles.add(flowFile);
    +
             final Set<SendMessageBatchRequestEntry> entries = new HashSet<>();
     
    -        final SendMessageBatchRequestEntry entry = new 
SendMessageBatchRequestEntry();
    -        entry.setId(flowFile.getAttribute("uuid"));
    -        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    -        session.exportTo(flowFile, baos);
    -        final String flowFileContent = baos.toString();
    -        entry.setMessageBody(flowFileContent);
    +        for(FlowFile flowFileItem : flowFiles) {
     
    -        final Map<String, MessageAttributeValue> messageAttributes = new 
HashMap<>();
    +            final SendMessageBatchRequestEntry entry = new 
SendMessageBatchRequestEntry();
    +            entry.setId(flowFileItem.getAttribute("uuid"));
    +            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +            session.exportTo(flowFileItem, baos);
    +            final String flowFileContent = baos.toString();
    +            entry.setMessageBody(flowFileContent);
     
    -        for (final PropertyDescriptor descriptor : userDefinedProperties) {
    -            final MessageAttributeValue mav = new MessageAttributeValue();
    -            mav.setDataType("String");
    -            
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFile).getValue());
    -            messageAttributes.put(descriptor.getName(), mav);
    -        }
    +            final Map<String, MessageAttributeValue> messageAttributes = 
new HashMap<>();
     
    -        entry.setMessageAttributes(messageAttributes);
    -        
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
    -        entries.add(entry);
    +            for (final PropertyDescriptor descriptor : 
userDefinedProperties) {
    +                final MessageAttributeValue mav = new 
MessageAttributeValue();
    +                mav.setDataType("String");
    +                
mav.setStringValue(context.getProperty(descriptor).evaluateAttributeExpressions(flowFileItem).getValue());
    +                messageAttributes.put(descriptor.getName(), mav);
    +            }
    +
    +            entry.setMessageAttributes(messageAttributes);
    +            
entry.setDelaySeconds(context.getProperty(DELAY).asTimePeriod(TimeUnit.SECONDS).intValue());
    +            entries.add(entry);
    +
    +        }
     
             request.setEntries(entries);
     
             try {
                 client.sendMessageBatch(request);
             } catch (final Exception e) {
    -            getLogger().error("Failed to send messages to Amazon SQS due 
to {}; routing to failure", new Object[]{e});
    -            flowFile = session.penalize(flowFile);
    -            session.transfer(flowFile, REL_FAILURE);
    +            getLogger().error("Failed to send {} messages to Amazon SQS 
due to {}; routing to failure", new Object[]{flowFiles.size(), e});
    --- End diff --
    
    You're right, I forgot that we have individual responses for each entry 
part of the request. Will update the PR. Thanks @jzonthemtn 


> DeleteSQS and PutSQS should offer batch processing
> --------------------------------------------------
>
>                 Key: NIFI-515
>                 URL: https://issues.apache.org/jira/browse/NIFI-515
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Pierre Villard
>            Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to