sjvanrossum commented on code in PR #37164:
URL: https://github.com/apache/beam/pull/37164#discussion_r2639894543
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -164,10 +194,13 @@ public Instant getWatermark() {
@Override
public UnboundedSource.CheckpointMark getCheckpointMark() {
-
- ImmutableList<BytesXMLMessage> bytesXMLMessages =
ImmutableList.copyOf(receivedMessages);
+ Queue<BytesXMLMessage> safeToAckMessages = new ConcurrentLinkedQueue<>();
+ safeToAckMessages.addAll(receivedMessages);
Review Comment:
We're not modifying `safeToAckMessages` outside the scope of this function,
right? This doesn't have to be a concurrent container if that's the case.
I can't recall if `receivedMessages` can safely be replaced by the time this
function is called. If so, maybe this can be cleaned up a bit more by
reassigning `receivedMessages` to a new instance and passing the old instance
off to `nackMessages`.
##########
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java:
##########
@@ -150,9 +157,32 @@ public boolean advance() {
@Override
public void close() {
+ try {
+ if (nackCallback != null) {
+ // wait only for last one to finish, it will mean all the previous one
are also done.
+ nackCallback.get(ackDeadlineSeconds * 2, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ LOG.error("SolaceIO.Read: Failed to wait till nack background thread is
finished");
+ }
Review Comment:
Seems like this attempts to await the same conditions as when
calling[`ExecutorService#shutdown()`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#shutdown--)
and [`ExecutorService#awaitTermination(long,
TimeUnit)`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination-long-java.util.concurrent.TimeUnit-)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]