Abacn commented on code in PR #30218:
URL: https://github.com/apache/beam/pull/30218#discussion_r1480497997
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -39,87 +40,66 @@ class JmsCheckpointMark implements
UnboundedSource.CheckpointMark, Serializable
private static final Logger LOG =
LoggerFactory.getLogger(JmsCheckpointMark.class);
- private Instant oldestMessageTimestamp = Instant.now();
- private transient List<Message> messages = new ArrayList<>();
+ private Instant oldestMessageTimestamp;
+ private transient @Nullable Message lastMessage;
+ private transient @Nullable MessageConsumer consumer;
+ private transient @Nullable Session session;
- @VisibleForTesting transient boolean discarded = false;
-
- @VisibleForTesting final ReentrantReadWriteLock lock = new
ReentrantReadWriteLock();
-
- JmsCheckpointMark() {}
+ private JmsCheckpointMark(
+ Instant oldestMessageTimestamp,
+ @Nullable Message lastMessage,
+ @Nullable MessageConsumer consumer,
+ @Nullable Session session) {
+ this.oldestMessageTimestamp = oldestMessageTimestamp;
+ this.lastMessage = lastMessage;
+ this.consumer = consumer;
+ this.session = session;
+ }
- void add(Message message) throws Exception {
- lock.writeLock().lock();
+ /** Acknowledge all outstanding message. */
+ @Override
+ public void finalizeCheckpoint() {
try {
- if (discarded) {
- throw new IllegalStateException(
- String.format(
- "Attempting to add message %s to checkpoint that is
discarded.", message));
- }
- Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
- if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
- oldestMessageTimestamp = currentMessageTimestamp;
+ // Jms spec will implicitly acknowledge _all_ messaged already received
by the same
+ // session if one message in this session is being acknowledged.
+ if (lastMessage != null) {
+ lastMessage.acknowledge();
}
- messages.add(message);
- } finally {
- lock.writeLock().unlock();
+ } catch (JMSException e) {
+ // The effect of this is message not get acknowledged and thus will be
redilivered. It is
+ // not fatal so we just raise error log. Similar below.
+ LOG.error("Exception while finalizing message: ", e);
}
- }
- Instant getOldestMessageTimestamp() {
- lock.readLock().lock();
- try {
- return this.oldestMessageTimestamp;
- } finally {
- lock.readLock().unlock();
+ // session is closed after message acknowledged otherwise other consumer
may receive duplicate
+ // messages.
+ if (consumer != null) {
+ try {
+ consumer.close();
+ consumer = null;
+ } catch (JMSException e) {
+ LOG.info("Error closing JMS consumer. It may have already been
closed.");
Review Comment:
For the latter two, as long as we successfully acknowledged the messages,
there won't be duplicates for the messages in this checkpoint. So error in
closing consumer or session is less harm, so I put it as info level here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]