liferoad commented on code in PR #30218:
URL: https://github.com/apache/beam/pull/30218#discussion_r1480478395
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -39,87 +40,66 @@ class JmsCheckpointMark implements
UnboundedSource.CheckpointMark, Serializable
private static final Logger LOG =
LoggerFactory.getLogger(JmsCheckpointMark.class);
- private Instant oldestMessageTimestamp = Instant.now();
- private transient List<Message> messages = new ArrayList<>();
+ private Instant oldestMessageTimestamp;
+ private transient @Nullable Message lastMessage;
+ private transient @Nullable MessageConsumer consumer;
+ private transient @Nullable Session session;
- @VisibleForTesting transient boolean discarded = false;
-
- @VisibleForTesting final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock();
-
- JmsCheckpointMark() {}
+ private JmsCheckpointMark(
+ Instant oldestMessageTimestamp,
+ @Nullable Message lastMessage,
+ @Nullable MessageConsumer consumer,
+ @Nullable Session session) {
+ this.oldestMessageTimestamp = oldestMessageTimestamp;
+ this.lastMessage = lastMessage;
+ this.consumer = consumer;
+ this.session = session;
+ }
- void add(Message message) throws Exception {
- lock.writeLock().lock();
+ /** Acknowledge all outstanding message. */
+ @Override
+ public void finalizeCheckpoint() {
try {
- if (discarded) {
- throw new IllegalStateException(
- String.format(
- "Attempting to add message %s to checkpoint that is
discarded.", message));
- }
- Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
- if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
- oldestMessageTimestamp = currentMessageTimestamp;
+ // Jms spec will implicitly acknowledge _all_ messaged already received
by the same
+ // session if one message in this session is being acknowledged.
+ if (lastMessage != null) {
+ lastMessage.acknowledge();
}
- messages.add(message);
- } finally {
- lock.writeLock().unlock();
+ } catch (JMSException e) {
+ // The effect of this is message not get acknowledged and thus will be
redilivered. It is
+ // not fatal so we just raise error log. Similar below.
+ LOG.error("Exception while finalizing message: ", e);
}
- }
- Instant getOldestMessageTimestamp() {
- lock.readLock().lock();
- try {
- return this.oldestMessageTimestamp;
- } finally {
- lock.readLock().unlock();
+ // session is closed after message acknowledged otherwise other consumer
may receive duplicate
+ // messages.
+ if (consumer != null) {
+ try {
+ consumer.close();
+ consumer = null;
+ } catch (JMSException e) {
+ LOG.info("Error closing JMS consumer. It may have already been
closed.");
+ }
}
- }
- void discard() {
- lock.writeLock().lock();
- try {
- this.discarded = true;
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- /**
- * Acknowledge all outstanding message. Since we believe that messages will
be delivered in
- * timestamp order, and acknowledged messages will not be retried, the
newest message in this
- * batch is a good bound for future messages.
- */
- @Override
- public void finalizeCheckpoint() {
- lock.writeLock().lock();
- try {
- if (discarded) {
- messages.clear();
- return;
+ // session needs to be closed after message acknowledged because the
latter needs session remain
+ // active.
+ if (session != null) {
+ try {
+ session.close();
+ session = null;
+ } catch (JMSException e) {
+ LOG.info("Error closing JMS session. It may have already been
closed.");
}
- for (Message message : messages) {
- try {
- message.acknowledge();
- Instant currentMessageTimestamp = new
Instant(message.getJMSTimestamp());
- if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
- oldestMessageTimestamp = currentMessageTimestamp;
- }
- } catch (Exception e) {
- LOG.error("Exception while finalizing message: ", e);
- }
- }
- messages.clear();
- } finally {
- lock.writeLock().unlock();
}
}
// set an empty list to messages when deserialize
Review Comment:
update the comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]