[ 
https://issues.apache.org/jira/browse/BEAM-13510?focusedWorklogId=714272&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-714272
 ]

ASF GitHub Bot logged work on BEAM-13510:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jan/22 08:21
            Start Date: 25/Jan/22 08:21
    Worklog Time Spent: 10m 
      Work Description: mosche commented on a change in pull request #16478:
URL: https://github.com/apache/beam/pull/16478#discussion_r791458947



##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -509,70 +524,87 @@ void maybeCloseClient() throws IOException {
     }
   }
 
-  /** delete the provided {@code messageIds} from SQS. */
+  /**
+   * Delete the provided {@code messageIds} from SQS in multiple batches. Each 
batch except the last
+   * one is of size {@code DELETE_BATCH_SIZE}. Message ids that already got 
removed from {@code
+   * inFlight} messages are ignored.
+   *
+   * <p>CAUTION: May be invoked from a separate thread.
+   */
   void delete(List<String> messageIds) throws IOException {
-    AtomicInteger counter = new AtomicInteger();
-    for (List<String> messageList :
-        messageIds.stream()
-            .collect(groupingBy(x -> counter.getAndIncrement() / 
DELETE_BATCH_SIZE))
-            .values()) {
-      deleteBatch(messageList);
+    ArrayList<String> receiptHandles = new ArrayList<>(DELETE_BATCH_SIZE);
+    for (String msgId : messageIds) {
+      InFlightState state = inFlight.get(msgId);
+      if (state == null) {
+        continue;
+      }
+      receiptHandles.add(state.receiptHandle);
+      if (receiptHandles.size() == DELETE_BATCH_SIZE) {
+        deleteBatch(receiptHandles);
+        receiptHandles.clear();
+      }
+    }
+    if (!receiptHandles.isEmpty()) {
+      deleteBatch(receiptHandles);
     }
+    deletedIds.add(messageIds);
   }
 
   /**
-   * delete the provided {@code messageIds} from SQS, blocking until all of 
the messages are
-   * deleted.
+   * Delete the provided {@code receiptHandles} from SQS. Blocking until all 
messages are deleted.
    *
    * <p>CAUTION: May be invoked from a separate thread.
-   *
-   * <p>CAUTION: Retains {@code messageIds}.
    */
-  private void deleteBatch(List<String> messageIds) throws IOException {
+  private void deleteBatch(List<String> receiptHandles) throws IOException {
     int retries = 0;
-    Map<String, String> pendingReceipts =
-        IntStream.range(0, messageIds.size())
-            .boxed()
-            .filter(i -> inFlight.containsKey(messageIds.get(i)))
-            .collect(toMap(Object::toString, i -> 
inFlight.get(messageIds.get(i)).receiptHandle));
 
-    while (!pendingReceipts.isEmpty()) {
+    FunctionWithIndex<String, DeleteMessageBatchRequestEntry> buildEntry =
+        (handle, id) ->
+            DeleteMessageBatchRequestEntry.builder()
+                .id(Long.toString(id))
+                .receiptHandle(handle)
+                .build();
+
+    Map<String, DeleteMessageBatchRequestEntry> pendingDeletes =
+        mapWithIndex(receiptHandles.stream(), buildEntry).collect(toMap(e -> 
e.id(), identity()));
+
+    while (!pendingDeletes.isEmpty()) {
 
       if (retries >= BATCH_OPERATION_MAX_RETIRES) {
         throw new IOException(
-            "Failed to extend visibility timeout for "
-                + pendingReceipts.size()
+            "Failed to delete "
+                + pendingDeletes.size()
                 + " messages after "
                 + retries
                 + " retries");
       }
 
-      List<DeleteMessageBatchRequestEntry> entries =
-          pendingReceipts.entrySet().stream()
-              .map(
-                  r ->
-                      DeleteMessageBatchRequestEntry.builder()
-                          .id(r.getKey())
-                          .receiptHandle(r.getValue())
-                          .build())
-              .collect(Collectors.toList());
-
       DeleteMessageBatchResponse result =
           sqsClient.deleteMessageBatch(
               DeleteMessageBatchRequest.builder()
-                  .queueUrl(source.getRead().queueUrl())
-                  .entries(entries)
+                  .queueUrl(queueUrl())
+                  .entries(pendingDeletes.values())
                   .build());
 
-      // Reflect failed message IDs to map
-      pendingReceipts
-          .keySet()
-          .retainAll(
-              
result.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toSet()));
+      Map<Boolean, Set<String>> failures =
+          result.failed().stream()
+              .collect(partitioningBy(this::isHandleInvalid, mapping(e -> 
e.id(), toSet())));
+
+      // Keep failed IDs only, but discard invalid receipt handles
+      pendingDeletes.keySet().retainAll(failures.getOrDefault(FALSE, 
ImmutableSet.of()));

Review comment:
       exactly, the might have been redelivered to & deleted by another reader 
in the meanwhile ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 714272)
    Time Spent: 2h  (was: 1h 50m)

> SQS reader retries on invalid (expired) receipt handles (AWS SDK v2)
> --------------------------------------------------------------------
>
>                 Key: BEAM-13510
>                 URL: https://issues.apache.org/jira/browse/BEAM-13510
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Assignee: Moritz Mack
>            Priority: P2
>              Labels: SQS, aws-sdk-v2
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> The extension of visibility timeouts as well as deletes with invalid 
> (expired) receipt handles will never succeed and therefore cannot be retried. 
> Investigate if this is also an issue in SDK v1.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to