lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r995107311
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +594,83 @@ public long getTotalBacklogBytes() {
}
@Override
- public void close() throws IOException {
+ public void close() {
+ doClose();
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void doClose() {
+
try {
- if (consumer != null) {
- consumer.close();
- consumer = null;
+ closeAutoscaler();
+ closeConsumer();
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
Review Comment:
Please use the scheduled executor service from ExecutorOptions once it is
merged from https://github.com/apache/beam/pull/23234
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
return builder().setAutoScaler(autoScaler).build();
}
+ public Read<T> withCloseTimeout(long closeTimeout) {
Review Comment:
Can we update this to use a duration object?
```suggestion
public Read<T> withCloseTimeout(Duration closeTimeout) {
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -582,29 +594,83 @@ public long getTotalBacklogBytes() {
}
@Override
- public void close() throws IOException {
+ public void close() {
+ doClose();
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void doClose() {
+
try {
- if (consumer != null) {
- consumer.close();
- consumer = null;
+ closeAutoscaler();
+ closeConsumer();
+ ScheduledExecutorService executorService =
Executors.newSingleThreadScheduledExecutor();
+ executorService.schedule(
+ () -> {
+ LOG.debug(
+ "Closing session and connection after delay {}",
source.spec.getCloseTimeout());
+ // Discard the checkpoints and set the reader as inactive
+ checkpointMark.discard();
+ closeSession();
+ closeConnection();
+ },
+ source.spec.getCloseTimeout(),
+ TimeUnit.MILLISECONDS);
Review Comment:
note to self, this should match the unit on the closeTimeout duration object
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
return builder().setAutoScaler(autoScaler).build();
}
+ public Read<T> withCloseTimeout(long closeTimeout) {
+ return builder().setCloseTimeout(closeTimeout).build();
Review Comment:
```suggestion
if (closeTimeout.isNegative()) { throw new
IllegalArgumentException("Close timeout must be non-negative."); }
return builder().setCloseTimeout(closeTimeout).build();
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -76,15 +86,17 @@ Instant getOldestMessageTimestamp() {
public void finalizeCheckpoint() {
lock.writeLock().lock();
try {
- for (Message message : messages) {
- try {
- message.acknowledge();
- Instant currentMessageTimestamp = new
Instant(message.getJMSTimestamp());
- if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
- oldestMessageTimestamp = currentMessageTimestamp;
+ if (!discarded) {
+ for (Message message : messages) {
+ try {
+ message.acknowledge();
+ Instant currentMessageTimestamp = new
Instant(message.getJMSTimestamp());
+ if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
+ oldestMessageTimestamp = currentMessageTimestamp;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception while finalizing message: ", e);
Review Comment:
consider using a guard statement instead:
```suggestion
if (discarded) {
messages.clear();
return;
}
for (Message message : messages) {
try {
message.acknowledge();
Instant currentMessageTimestamp = new
Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
}
} catch (Exception e) {
LOG.error("Exception while finalizing message: ", e);
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+
+ int closeTimeout = 2000;
+ JmsIO.Read spec =
+ JmsIO.read()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE)
+ .withCloseTimeout(closeTimeout);
+
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ reader.start();
+ reader.close();
+
+ boolean discarded = getDiscardedValue(reader);
+ assertFalse(discarded);
+ try {
+ Thread.sleep(closeTimeout + 1000);
+ } catch (InterruptedException ignored) {
+ }
+ discarded = getDiscardedValue(reader);
+ assertTrue(discarded);
+ }
+
+ @Test
+ public void testDiscardCheckpointMark() throws Exception {
+
+ Connection connection =
+
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(QUEUE));
+ for (int i = 0; i < 10; i++) {
+ producer.send(session.createTextMessage("test " + i));
+ }
+ producer.close();
+ session.close();
+ connection.close();
+
+ JmsIO.Read spec =
+ JmsIO.read()
+
.withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE);
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume 3 messages (NB: start already consumed the first message)
+ for (int i = 0; i < 3; i++) {
+ assertTrue(reader.advance());
+ }
+
+ // the messages are still pending in the queue (no ACK yet)
+ assertEquals(10, count(QUEUE));
+
+ // we finalize the checkpoint
+ reader.getCheckpointMark().finalizeCheckpoint();
+
+ // the checkpoint finalize ack the messages, and so they are not pending
in the queue anymore
+ assertEquals(6, count(QUEUE));
+
+ // we read the 6 pending messages
+ for (int i = 0; i < 6; i++) {
+ assertTrue(reader.advance());
+ }
+
+ // still 6 pending messages as we didn't finalize the checkpoint
+ assertEquals(6, count(QUEUE));
+
+ // But here we discard the checkpoint
+ ((JmsCheckpointMark) reader.getCheckpointMark()).discard();
+ // we finalize the checkpoint: no more message in the queue
+ reader.getCheckpointMark().finalizeCheckpoint();
+
+ assertEquals(6, count(QUEUE));
+ }
+
+ private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader)
+ throws NoSuchFieldException, IllegalAccessException {
+ JmsCheckpointMark checkpoint = (JmsCheckpointMark)
reader.getCheckpointMark();
+ Field privateField = JmsCheckpointMark.class.getDeclaredField("discarded");
+ privateField.setAccessible(true);
+ boolean discarded = (boolean) privateField.get(checkpoint);
+ return discarded;
Review Comment:
There is no guarantee that the discarded field will be readable by this
thread since we don't acquire the lock which causes a memory barrier.
This will work if you take my suggestion about making the fields package
private:
```suggestion
JmsCheckpointMark checkpoint = (JmsCheckpointMark)
reader.getCheckpointMark();
checkpoint.lock.readLock().lock();
try {
return checkpoint.discarded;
} finally {
checkpoint.lock.readLock().unlock();
}
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -40,6 +40,7 @@ class JmsCheckpointMark implements
UnboundedSource.CheckpointMark, Serializable
private Instant oldestMessageTimestamp = Instant.now();
private transient List<Message> messages = new ArrayList<>();
+ private transient boolean discarded = false;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
Review Comment:
It is common to mark certain fields/classes package private and mark it with
`@VisibleForTesting` then you don't need to use reflection to access the field.
```suggestion
@VisibleForTesting
transient boolean discarded = false;
@VisibleForTesting
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsCheckpointMark.java:
##########
@@ -67,6 +68,15 @@ Instant getOldestMessageTimestamp() {
}
Review Comment:
In `add(Message message)`, add the check right after acquiring the lock:
```
if (discarded) {
throw new IllegalStateException(String.format("Attempting to add message
%s to checkpoint that is discarded.", message));
}
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+
+ int closeTimeout = 2000;
+ JmsIO.Read spec =
+ JmsIO.read()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE)
+ .withCloseTimeout(closeTimeout);
+
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ reader.start();
+ reader.close();
Review Comment:
I would suggest using a mock ScheduledExecutorService that you set on the
PipelineOptions object when creating the reader. This way you can inject here
in the test and capture the runnable/callable directly without needing to have
a test reliant on Thread.sleep
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+
+ int closeTimeout = 2000;
+ JmsIO.Read spec =
+ JmsIO.read()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE)
+ .withCloseTimeout(closeTimeout);
+
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ reader.start();
+ reader.close();
+
+ boolean discarded = getDiscardedValue(reader);
+ assertFalse(discarded);
+ try {
+ Thread.sleep(closeTimeout + 1000);
+ } catch (InterruptedException ignored) {
+ }
+ discarded = getDiscardedValue(reader);
+ assertTrue(discarded);
+ }
+
+ @Test
+ public void testDiscardCheckpointMark() throws Exception {
+
+ Connection connection =
+
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(QUEUE));
+ for (int i = 0; i < 10; i++) {
+ producer.send(session.createTextMessage("test " + i));
+ }
+ producer.close();
+ session.close();
+ connection.close();
+
+ JmsIO.Read spec =
+ JmsIO.read()
+
.withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE);
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume 3 messages (NB: start already consumed the first message)
Review Comment:
```suggestion
// consume 3 more messages (NB: start already consumed the first message)
```
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,10 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
return builder().setAutoScaler(autoScaler).build();
}
+ public Read<T> withCloseTimeout(long closeTimeout) {
Review Comment:
/**
* Sets the amount of time to wait for callbacks from the runner stating
that the output has been durably persisted before closing the connection to the
JMS broker. Any callbacks that do not occur will cause any unacknowledged
messages to be returned to the JMS broker and redelivered to other clients.
*/
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout()
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+
+ int closeTimeout = 2000;
+ JmsIO.Read spec =
+ JmsIO.read()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE)
+ .withCloseTimeout(closeTimeout);
+
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ reader.start();
+ reader.close();
+
+ boolean discarded = getDiscardedValue(reader);
+ assertFalse(discarded);
+ try {
+ Thread.sleep(closeTimeout + 1000);
+ } catch (InterruptedException ignored) {
+ }
+ discarded = getDiscardedValue(reader);
+ assertTrue(discarded);
+ }
+
+ @Test
+ public void testDiscardCheckpointMark() throws Exception {
+
+ Connection connection =
+
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME,
PASSWORD);
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(QUEUE));
+ for (int i = 0; i < 10; i++) {
+ producer.send(session.createTextMessage("test " + i));
+ }
+ producer.close();
+ session.close();
+ connection.close();
+
+ JmsIO.Read spec =
+ JmsIO.read()
+
.withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE);
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+ JmsIO.UnboundedJmsReader reader = source.createReader(null, null);
+
+ // start the reader and move to the first record
+ assertTrue(reader.start());
+
+ // consume 3 messages (NB: start already consumed the first message)
+ for (int i = 0; i < 3; i++) {
+ assertTrue(reader.advance());
+ }
+
+ // the messages are still pending in the queue (no ACK yet)
+ assertEquals(10, count(QUEUE));
+
+ // we finalize the checkpoint
+ reader.getCheckpointMark().finalizeCheckpoint();
+
+ // the checkpoint finalize ack the messages, and so they are not pending
in the queue anymore
+ assertEquals(6, count(QUEUE));
+
+ // we read the 6 pending messages
+ for (int i = 0; i < 6; i++) {
+ assertTrue(reader.advance());
+ }
+
+ // still 6 pending messages as we didn't finalize the checkpoint
+ assertEquals(6, count(QUEUE));
+
+ // But here we discard the checkpoint
+ ((JmsCheckpointMark) reader.getCheckpointMark()).discard();
+ // we finalize the checkpoint: no more message in the queue
Review Comment:
```suggestion
// we finalize the checkpoint: no messages should be acked
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]