sjvanrossum commented on code in PR #37164:
URL: https://github.com/apache/beam/pull/37164#discussion_r2639894543


##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -164,10 +194,13 @@ public Instant getWatermark() {
 
   @Override
   public UnboundedSource.CheckpointMark getCheckpointMark() {
-
-    ImmutableList<BytesXMLMessage> bytesXMLMessages = 
ImmutableList.copyOf(receivedMessages);
+    Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
+    safeToAckMessages.addAll(receivedMessages);

Review Comment:
   We're not modifying `safeToAckMessages` outside the scope of this function, 
right? This doesn't have to be a concurrent container if that's the case.
   
   I can't recall if `receivedMessages` can safely be replaced by the time this 
function is called. If so, maybe this can be cleaned up a bit more by 
reassigning `receivedMessages` to a new instance and passing the old instance 
off to `nackMessages`.



##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -150,9 +157,32 @@ public boolean advance() {
 
   @Override
   public void close() {
+    try {
+      if (nackCallback != null) {
+        // wait only for last one to finish, it will mean all the previous one 
are also done.
+        nackCallback.get(ackDeadlineSeconds * 2, TimeUnit.SECONDS);
+      }
+    } catch (InterruptedException | ExecutionException | TimeoutException e) {
+      LOG.error("SolaceIO.Read: Failed to wait till nack background thread is 
finished");
+    }

Review Comment:
   Seems like this attempts to await the same conditions as when 
calling[`ExecutorService#shutdown()`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#shutdown--)
 and [`ExecutorService#awaitTermination(long, 
TimeUnit)`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination-long-java.util.concurrent.TimeUnit-)?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to