echauchot commented on a change in pull request #16478:
URL: https://github.com/apache/beam/pull/16478#discussion_r791483523
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
+ * immediate redelivery.
+ */
+ void expireBatchForRedelivery(List<String> receiptHandles) throws
IOException {
Review comment:
ok makes sense, thx
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]