mosche commented on a change in pull request #16478:
URL: https://github.com/apache/beam/pull/16478#discussion_r791462436



##########
File path: 
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
         // We'll try to track that on our side, but note the deadlines won't 
necessarily agree.
         long extensionMs = (int) ((visibilityTimeoutMs * 
VISIBILITY_EXTENSION_PCT) / 100L);
         long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+        List<KV<String, String>> messages = new 
ArrayList<>(toBeExtended.size());
+
         for (String messageId : toBeExtended) {
           // Maintain increasing ack deadline order.
-          String receiptHandle = inFlight.get(messageId).receiptHandle;
           InFlightState state = inFlight.remove(messageId);
-          inFlight.put(
-              messageId,
-              new InFlightState(
-                  receiptHandle, state.requestTimeMsSinceEpoch, 
newDeadlineMsSinceEpoch));
+          state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+          inFlight.put(messageId, state);
+          messages.add(KV.of(messageId, state.receiptHandle));
         }
-        List<String> receiptHandles =
-            toBeExtended.stream()
-                .map(inFlight::get)
-                .filter(Objects::nonNull) // get rid of null values
-                .map(m -> m.receiptHandle)
-                .collect(Collectors.toList());
+
         // BLOCKs until extended.
-        extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs / 
1000));
+        extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
       }
     }
   }
 
   /**
-   * BLOCKING Extend the visibility timeout for messages from SQS with the 
given {@code
-   * receiptHandles}.
+   * BLOCKING Set the SQS visibility timeout for messages in {@code 
receiptHandles} to zero for
+   * immediate redelivery.
+   */
+  void expireBatchForRedelivery(List<String> receiptHandles) throws 
IOException {

Review comment:
       The visibility timeout defines the point in time when an SQS message is 
available for re-delivery again unless it was deleted or extended in the 
meanwhile. These messages will be randomly delivered to any reader that polls 
for messages next. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to