Abacn commented on code in PR #30218:
URL: https://github.com/apache/beam/pull/30218#discussion_r1480495756


##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -39,87 +40,66 @@ class JmsCheckpointMark implements 
UnboundedSource.CheckpointMark, Serializable
 
   private static final Logger LOG = 
LoggerFactory.getLogger(JmsCheckpointMark.class);
 
-  private Instant oldestMessageTimestamp = Instant.now();
-  private transient List<Message> messages = new ArrayList<>();
+  private Instant oldestMessageTimestamp;
+  private transient @Nullable Message lastMessage;
+  private transient @Nullable MessageConsumer consumer;
+  private transient @Nullable Session session;
 
-  @VisibleForTesting transient boolean discarded = false;
-
-  @VisibleForTesting final ReentrantReadWriteLock lock = new 
ReentrantReadWriteLock();
-
-  JmsCheckpointMark() {}
+  private JmsCheckpointMark(
+      Instant oldestMessageTimestamp,
+      @Nullable Message lastMessage,
+      @Nullable MessageConsumer consumer,
+      @Nullable Session session) {
+    this.oldestMessageTimestamp = oldestMessageTimestamp;
+    this.lastMessage = lastMessage;
+    this.consumer = consumer;
+    this.session = session;
+  }
 
-  void add(Message message) throws Exception {
-    lock.writeLock().lock();
+  /** Acknowledge all outstanding message. */
+  @Override
+  public void finalizeCheckpoint() {
     try {
-      if (discarded) {
-        throw new IllegalStateException(
-            String.format(
-                "Attempting to add message %s to checkpoint that is 
discarded.", message));
-      }
-      Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
-      if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
-        oldestMessageTimestamp = currentMessageTimestamp;
+      // Jms spec will implicitly acknowledge _all_ messaged already received 
by the same
+      // session if one message in this session is being acknowledged.
+      if (lastMessage != null) {
+        lastMessage.acknowledge();
       }
-      messages.add(message);
-    } finally {
-      lock.writeLock().unlock();
+    } catch (JMSException e) {
+      // The effect of this is message not get acknowledged and thus will be 
redilivered. It is
+      // not fatal so we just raise error log. Similar below.
+      LOG.error("Exception while finalizing message: ", e);

Review Comment:
   Once we loss a healthy session, there is no way to acknowledge these 
messages, and there won't be retry. So I put this log as error level.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to