lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r983013968


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
    * batch is a good bound for future messages.
    */
   @Override
-  public void finalizeCheckpoint() {
-    lock.writeLock().lock();
+  public void finalizeCheckpoint() throws IOException {
     try {
-      for (Message message : messages) {
-        try {
+      LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
+      if (reader.active.get() && reader != null) {
+        for (Message message : messagesToAck) {
           message.acknowledge();
           Instant currentMessageTimestamp = new 
Instant(message.getJMSTimestamp());
-          if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
-            oldestMessageTimestamp = currentMessageTimestamp;
-          }
-        } catch (Exception e) {
-          LOG.error("Exception while finalizing message: ", e);
+          reader.watermark.updateAndGet(
+              prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
         }
       }
-      messages.clear();
+    } catch (JMSException e) {
+      throw new IOException("Exception while finalizing message ", e);
     } finally {
-      lock.writeLock().unlock();
+      reader = null;
     }
   }
 
-  // set an empty list to messages when deserialize
-  private void readObject(java.io.ObjectInputStream stream)
-      throws IOException, ClassNotFoundException {
-    stream.defaultReadObject();
-    messages = new ArrayList<>();
-  }
-
   @Override
-  public boolean equals(@Nullable Object o) {
+  public boolean equals(Object o) {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof JmsCheckpointMark)) {
       return false;
     }
     JmsCheckpointMark that = (JmsCheckpointMark) o;
-    return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
+    return readerHash == that.readerHash;
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(oldestMessageTimestamp);
+    return readerHash;

Review Comment:
   Changing the fields stored in the object will change its serialVersionUID 
and make it incompatible with pipeline update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to