lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r957532345


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   `return System.identityHashCode(reader);`?



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -38,33 +37,21 @@ class JmsCheckpointMark implements 
UnboundedSource.CheckpointMark, Serializable
 
   private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
 
-  private Instant oldestMessageTimestamp = Instant.now();
-  private transient List<Message> messages = new ArrayList<>();
+  private transient JmsIO.UnboundedJmsReader<?> reader;
+  private transient List<Message> messagesToAck;
+  private final int readerHash;

Review Comment:
   Try not to add/or remove fields as this will impact pipeline update since 
the JmsCheckpoints are saved in the runner and the SerializableCoder that is 
used to encode these will complain about [local class 
incompatible](https://www.baeldung.com/java-serial-version-uid).



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;

Review Comment:
   Wouldn't you want to ensure it is the same reader instance?
   
   (It is unlikely but hashes can collide)



##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -463,23 +469,24 @@ public Coder<T> getOutputCoder() {
   static class UnboundedJmsReader<T> extends UnboundedReader<T> {
 
     private UnboundedJmsSource<T> source;
-    private JmsCheckpointMark checkpointMark;
     private Connection connection;
     private Session session;
     private MessageConsumer consumer;
     private AutoScaler autoScaler;
 
     private T currentMessage;
+    private Message currentJmsMessage;
     private Instant currentTimestamp;
 
-    public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark 
checkpointMark) {
+    Set<Message> messagesToAck;

Review Comment:
   I would agree that this would continue to work as the previous solution did.
   
   Storing the message ids and acking them would be an improvement that could 
be considered in the future though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to