sjvanrossum commented on code in PR #32962:
URL: https://github.com/apache/beam/pull/32962#discussion_r1838211897
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java:
##########
@@ -38,36 +38,43 @@
@Internal
@VisibleForTesting
public class SolaceCheckpointMark implements UnboundedSource.CheckpointMark {
+ private transient Map<Long, BytesXMLMessage> safeToAck;
+ private transient Consumer<Long> confirmAckCallback;
private transient AtomicBoolean activeReader;
- // BytesXMLMessage is not serializable so if a job restarts from the
checkpoint, we cannot retry
- // these messages here. We relay on Solace's retry mechanism.
- private transient ArrayDeque<BytesXMLMessage> ackQueue;
@SuppressWarnings("initialization") // Avro will set the fields by breaking
abstraction
private SolaceCheckpointMark() {}
/**
* Creates a new {@link SolaceCheckpointMark}.
*
+ * @param markAsAckedFn {@link Consumer<Long>} a reference to a method in
the {@link
+ * UnboundedSolaceReader} that will mark the message as acknowledged.
* @param activeReader {@link AtomicBoolean} indicating if the related
reader is active. The
* reader creating the messages has to be active to acknowledge the
messages.
- * @param ackQueue {@link List} of {@link BytesXMLMessage} to be
acknowledged.
+ * @param safeToAck {@link Map<Long, BytesXMLMessage>} of {@link
BytesXMLMessage} to be
+ * acknowledged.
*/
- SolaceCheckpointMark(AtomicBoolean activeReader, List<BytesXMLMessage>
ackQueue) {
+ SolaceCheckpointMark(
+ Consumer<Long> markAsAckedFn,
+ AtomicBoolean activeReader,
+ Map<Long, BytesXMLMessage> safeToAck) {
+ this.confirmAckCallback = markAsAckedFn;
this.activeReader = activeReader;
- this.ackQueue = new ArrayDeque<>(ackQueue);
+ this.safeToAck = safeToAck;
}
@Override
public void finalizeCheckpoint() {
- if (activeReader == null || !activeReader.get() || ackQueue == null) {
+ if (activeReader == null || !activeReader.get() || safeToAck == null) {
return;
}
- while (!ackQueue.isEmpty()) {
- BytesXMLMessage msg = ackQueue.poll();
+ for (Entry<Long, BytesXMLMessage> entry : safeToAck.entrySet()) {
+ BytesXMLMessage msg = entry.getValue();
if (msg != null) {
msg.ackMessage();
+ confirmAckCallback.accept(entry.getKey());
}
}
}
Review Comment:
Add a try/catch here, see
https://docs.solace.com/API-Developer-Online-Ref-Documentation/java/index.html
for the exception states of `ackMessage`. Make sure to add a meaningful log
statement for debugging purposes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]