mosche commented on a change in pull request #16478:
URL: https://github.com/apache/beam/pull/16478#discussion_r791458947



##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -509,70 +524,87 @@ void maybeCloseClient() throws IOException {
     }
   }
 
-  /** delete the provided {@code messageIds} from SQS. */
+  /**
+   * Delete the provided {@code messageIds} from SQS in multiple batches. Each 
batch except the last
+   * one is of size {@code DELETE_BATCH_SIZE}. Message ids that already got 
removed from {@code
+   * inFlight} messages are ignored.
+   *
+   * <p>CAUTION: May be invoked from a separate thread.
+   */
   void delete(List<String> messageIds) throws IOException {
-    AtomicInteger counter = new AtomicInteger();
-    for (List<String> messageList :
-        messageIds.stream()
-            .collect(groupingBy(x -> counter.getAndIncrement() / 
DELETE_BATCH_SIZE))
-            .values()) {
-      deleteBatch(messageList);
+    ArrayList<String> receiptHandles = new ArrayList<>(DELETE_BATCH_SIZE);
+    for (String msgId : messageIds) {
+      InFlightState state = inFlight.get(msgId);
+      if (state == null) {
+        continue;
+      }
+      receiptHandles.add(state.receiptHandle);
+      if (receiptHandles.size() == DELETE_BATCH_SIZE) {
+        deleteBatch(receiptHandles);
+        receiptHandles.clear();
+      }
+    }
+    if (!receiptHandles.isEmpty()) {
+      deleteBatch(receiptHandles);
     }
+    deletedIds.add(messageIds);
   }
 
   /**
-   * delete the provided {@code messageIds} from SQS, blocking until all of 
the messages are
-   * deleted.
+   * Delete the provided {@code receiptHandles} from SQS. Blocking until all 
messages are deleted.
    *
    * <p>CAUTION: May be invoked from a separate thread.
-   *
-   * <p>CAUTION: Retains {@code messageIds}.
    */
-  private void deleteBatch(List<String> messageIds) throws IOException {
+  private void deleteBatch(List<String> receiptHandles) throws IOException {
     int retries = 0;
-    Map<String, String> pendingReceipts =
-        IntStream.range(0, messageIds.size())
-            .boxed()
-            .filter(i -> inFlight.containsKey(messageIds.get(i)))
-            .collect(toMap(Object::toString, i -> 
inFlight.get(messageIds.get(i)).receiptHandle));
 
-    while (!pendingReceipts.isEmpty()) {
+    FunctionWithIndex<String, DeleteMessageBatchRequestEntry> buildEntry =
+        (handle, id) ->
+            DeleteMessageBatchRequestEntry.builder()
+                .id(Long.toString(id))
+                .receiptHandle(handle)
+                .build();
+
+    Map<String, DeleteMessageBatchRequestEntry> pendingDeletes =
+        mapWithIndex(receiptHandles.stream(), buildEntry).collect(toMap(e -> 
e.id(), identity()));
+
+    while (!pendingDeletes.isEmpty()) {
 
       if (retries >= BATCH_OPERATION_MAX_RETIRES) {
         throw new IOException(
-            "Failed to extend visibility timeout for "
-                + pendingReceipts.size()
+            "Failed to delete "
+                + pendingDeletes.size()
                 + " messages after "
                 + retries
                 + " retries");
       }
 
-      List<DeleteMessageBatchRequestEntry> entries =
-          pendingReceipts.entrySet().stream()
-              .map(
-                  r ->
-                      DeleteMessageBatchRequestEntry.builder()
-                          .id(r.getKey())
-                          .receiptHandle(r.getValue())
-                          .build())
-              .collect(Collectors.toList());
-
       DeleteMessageBatchResponse result =
           sqsClient.deleteMessageBatch(
               DeleteMessageBatchRequest.builder()
-                  .queueUrl(source.getRead().queueUrl())
-                  .entries(entries)
+                  .queueUrl(queueUrl())
+                  .entries(pendingDeletes.values())
                   .build());
 
-      // Reflect failed message IDs to map
-      pendingReceipts
-          .keySet()
-          .retainAll(
-              
result.failed().stream().map(BatchResultErrorEntry::id).collect(Collectors.toSet()));
+      Map<Boolean, Set<String>> failures =
+          result.failed().stream()
+              .collect(partitioningBy(this::isHandleInvalid, mapping(e -> 
e.id(), toSet())));
+
+      // Keep failed IDs only, but discard invalid receipt handles
+      pendingDeletes.keySet().retainAll(failures.getOrDefault(FALSE, 
ImmutableSet.of()));

Review comment:
       exactly, the might have been redelivered to & deleted by another reader 
in the meanwhile ...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to