lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r984038907


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +58,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {

Review Comment:
   There is still a race here where reader.active.get() is true right now but 
becomes false while processing. I think you'll want to use a lock or 
synchronize on a common object.
   



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +608,85 @@ public long getTotalBacklogBytes() {
     }
 
     @Override
-    public void close() throws IOException {
+    public void close() {
+      doClose();
+    }
+
+    @SuppressWarnings("FutureReturnValueIgnored")
+    private void doClose() {
+      if (active.get()) {
+        try {
+          closeAutoscaler();
+          closeConsumer();
+          ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();

Review Comment:
   Once https://github.com/apache/beam/pull/23234/files gets merged then you 
can use the `ScheduledExecutorService` from `ExecutorOptions` instead of 
creating one.



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -123,11 +131,13 @@
 public class JmsIO {
 
   private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
+  private static final long DEFAULT_CLOSE_TIMEOUT = 1000L;

Review Comment:
   Is one second appropriate?



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +58,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {

Review Comment:
   Instead of having access to internal reader details, consider exposing this 
package private method on the reader:
   ```
   public void ackMessages(List<Message> messages) {
   ...
   }
   ```
   
   This way the internal details of how the watermark advances and 
thread-safety can be private within the reader.



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +381,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
       return builder().setAutoScaler(autoScaler).build();
     }
 
+    public Read<T> withCloseTimeout(long closeTimeout) {

Review Comment:
   Do you want to go with a wait on how long we keep sessions around?
   ```suggestion
       /** The amount of time to wait before a message that was read has its 
acknowledgement expire and be returned back to the JMS broker. */
       public Read<T> withMessageAckExpiry(Duration ackExpiry) {
   ```
   
   If not then I would suggest updating this to:
   ```suggestion
       /** The amount of time to wait for bundle finalization callbacks 
allowing for messages to be acknowledged after the reader is closed. */
       public Read<T> withConnectionCloseTimeout(Duration closeTimeout) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to