sjvanrossum commented on code in PR #32962:
URL: https://github.com/apache/beam/pull/32962#discussion_r1838193348


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -133,14 +153,10 @@ public Instant getWatermark() {
 
   @Override
   public UnboundedSource.CheckpointMark getCheckpointMark() {
-    List<BytesXMLMessage> ackQueue = new ArrayList<>();
-    while (!elementsToCheckpoint.isEmpty()) {
-      BytesXMLMessage msg = elementsToCheckpoint.poll();
-      if (msg != null) {
-        ackQueue.add(msg);
-      }
-    }
-    return new SolaceCheckpointMark(active, ackQueue);
+    // It's possible for a checkpoint to be taken but never finalized.
+    // So we simply copy whatever safeToAckIds we currently have.
+    Map<Long, BytesXMLMessage> snapshotSafeToAckMessages = 
Maps.newHashMap(safeToAckMessages);
+    return new SolaceCheckpointMark(this::markAsAcked, active, 
snapshotSafeToAckMessages);

Review Comment:
   The checkpoint mark does not need to be aware of whether the reader is or 
isn't active. The call to close may be observed by the finalizing thread to 
first close the underlying reader before atomically setting the active flag to 
false. Put a try/catch statement around the acknowledgement loop in the 
checkpoint mark's finalizer, if it trips on `IllegalStateException`, then the 
reader has been closed and none of the queued elements can be acknowledged.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -54,11 +56,23 @@ class UnboundedSolaceReader<T> extends UnboundedReader<T> {
   AtomicBoolean active = new AtomicBoolean(true);
 
   /**
-   * Queue to place advanced messages before {@link #getCheckpointMark()} be 
called non-concurrent
-   * queue, should only be accessed by the reader thread A given {@link 
UnboundedReader} object will
-   * only be accessed by a single thread at once.
+   * List of successfully ACKed message (surrogate) ids which need to be 
pruned from the above.
+   * CAUTION: Accessed by both reader and checkpointing threads.
    */
-  private final java.util.Queue<BytesXMLMessage> elementsToCheckpoint = new 
ArrayDeque<>();
+  private final Queue<Long> ackedMessageIds;
+
+  /**
+   * Map to place advanced messages before {@link #getCheckpointMark()} is 
called. This is a
+   * non-concurrent object, should only be accessed by the reader thread.
+   */
+  private final Map<Long, BytesXMLMessage> safeToAckMessages;
+
+  /**
+   * Surrogate id used as a key in Collections storing messages that are 
waiting to be acknowledged
+   * ({@link UnboundedSolaceReader#safeToAckMessages}) and already 
acknowledged ({@link
+   * UnboundedSolaceReader#ackedMessageIds}).
+   */
+  private Long surrogateId = 0L;

Review Comment:
   It seems like you could use the queue directly instead of this map.
   The reader can offer messages to the queue, while your checkpoint mark can 
poll from it and subsequently call `ackMessage()`. The elements observed by 
this reader should be finalized by checkpoint marks of this reader. 
Alternatively, a deserialized checkpoint mark may be provided the current 
reader's queue to perform clean up. The reference to this reader's queue may 
need to be marked as volatile, but I'd have to double check when the checkpoint 
mark is handed off to a different thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to