scwhittle commented on code in PR #38603:
URL: https://github.com/apache/beam/pull/38603#discussion_r3310337997


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -158,23 +163,30 @@ public boolean advance() {
 
   @Override
   public void close() {
-    finalizeReadyMessages();
     sessionServiceCache.invalidate(readerUuid);
+    ActiveReadersRegistry.unregister(readerUuid);
   }
 
-  public void finalizeReadyMessages() {
-    BytesXMLMessage msg;
-    while ((msg = safeToAckMessages.poll()) != null) {
+  void finalizeCheckpoint(long checkpointId) {
+    List<BytesXMLMessage> messagesToAck = new ArrayList<>();
+
+    synchronized (lock) {
+      SortedMap<Long, List<BytesXMLMessage>> toAck = 
pendingCheckpoints.headMap(checkpointId, true);

Review Comment:
   acking previous messages seems safe if Solace itself returns things in 
order. However if this is just stored in-memory in this process we may have 
cases where the worker crashes and we don't have the previous messages that 
were committed as processed to the backend in memory.
   
   Or if ranges are reassigned so that this process no longer has the source 
assigned to it we could not get a more recent finalization.  We may need some 
timeout on the cache so that we nack in that case (perhaps the existing reader 
cache could be sufficient).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to