mosche commented on a change in pull request #16478:
URL: https://github.com/apache/beam/pull/16478#discussion_r791465121
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/sqs/SqsUnboundedReader.java
##########
@@ -725,79 +757,104 @@ private void extend() throws IOException {
// We'll try to track that on our side, but note the deadlines won't
necessarily agree.
long extensionMs = (int) ((visibilityTimeoutMs *
VISIBILITY_EXTENSION_PCT) / 100L);
long newDeadlineMsSinceEpoch = nowMsSinceEpoch + extensionMs;
+ List<KV<String, String>> messages = new
ArrayList<>(toBeExtended.size());
+
for (String messageId : toBeExtended) {
// Maintain increasing ack deadline order.
- String receiptHandle = inFlight.get(messageId).receiptHandle;
InFlightState state = inFlight.remove(messageId);
- inFlight.put(
- messageId,
- new InFlightState(
- receiptHandle, state.requestTimeMsSinceEpoch,
newDeadlineMsSinceEpoch));
+ state.visibilityDeadlineMsSinceEpoch = newDeadlineMsSinceEpoch;
+ inFlight.put(messageId, state);
+ messages.add(KV.of(messageId, state.receiptHandle));
}
- List<String> receiptHandles =
- toBeExtended.stream()
- .map(inFlight::get)
- .filter(Objects::nonNull) // get rid of null values
- .map(m -> m.receiptHandle)
- .collect(Collectors.toList());
+
// BLOCKs until extended.
- extendBatch(nowMsSinceEpoch, receiptHandles, (int) (extensionMs /
1000));
+ extendBatch(nowMsSinceEpoch, messages, (int) (extensionMs / 1000));
}
}
}
/**
- * BLOCKING Extend the visibility timeout for messages from SQS with the
given {@code
- * receiptHandles}.
+ * BLOCKING Set the SQS visibility timeout for messages in {@code
receiptHandles} to zero for
+ * immediate redelivery.
+ */
+ void expireBatchForRedelivery(List<String> receiptHandles) throws
IOException {
+ List<KV<String, String>> messages =
+ mapWithIndex(receiptHandles.stream(), (handle, idx) ->
KV.of(Long.toString(idx), handle))
+ .collect(toList());
+
+ long nowMsSinceEpoch = now();
+ extendBatch(nowMsSinceEpoch, messages, 0);
+ numReleased.add(nowMsSinceEpoch, receiptHandles.size());
+ }
+
+ /**
+ * BLOCKING Extend the SQS visibility timeout for messages in {@code
messages} as {@link KV} of
+ * message id, receipt handle.
*/
- void extendBatch(long nowMsSinceEpoch, List<String> receiptHandles, int
extensionSec)
+ void extendBatch(long nowMsSinceEpoch, List<KV<String, String>> messages,
int extensionSec)
Review comment:
The way it works is that the visibility timeout is added onto the
current time (by SQS) to update the point in time when a message becomes
available for redelivery. It's not a general timeout, but really an extension
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]