lukecwik commented on code in PR #22932:
URL: https://github.com/apache/beam/pull/22932#discussion_r998509173
##########
sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java:
##########
@@ -364,6 +376,18 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
return builder().setAutoScaler(autoScaler).build();
}
+ /**
+ * Sets the amount of time to wait for callbacks from the runner stating
that the output has
+ * been durably persisted before closing the connection to the JMS broker.
Any callbacks that do
+ * not occur will cause any unacknowledged messages to be returned to the
JMS broker and
+ * redelivered to other clients.
Review Comment:
```suggestion
* not occur will cause unacknowledged messages to be returned to the
JMS broker and redelivered
* to other clients.
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -92,7 +98,8 @@ public class JmsIOTest {
private ConnectionFactory connectionFactory;
private ConnectionFactory connectionFactoryWithSyncAcksAndWithoutPrefetch;
- @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @Rule
+ public final transient TestPipeline pipeline =
TestPipeline.fromOptions(createExecutorOptions());
Review Comment:
We want to use the default for the existing tests:
```suggestion
@Rule public final transient TestPipeline pipeline =
TestPipeline.fromOptions();
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +561,111 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout() throws IOException {
+
Review Comment:
```suggestion
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +561,111 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout() throws IOException {
+
+ Duration closeTimeout = Duration.millis(2000L);
+ long waitTimeout = closeTimeout.getMillis() + 1000L;
+ JmsIO.Read spec =
+ JmsIO.read()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE)
+ .withCloseTimeout(closeTimeout);
+
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+
+ ExecutorOptions options = createExecutorOptions();
+
+ JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
+
+ reader.start();
+ reader.close();
+
+ boolean discarded = getDiscardedValue(reader);
+ assertFalse(discarded);
Review Comment:
```suggestion
assertFalse(getDiscardedValue(reader));
```
##########
sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java:
##########
@@ -555,6 +561,111 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}
+ @Test
+ public void testCloseWithTimeout() throws IOException {
+
+ Duration closeTimeout = Duration.millis(2000L);
+ long waitTimeout = closeTimeout.getMillis() + 1000L;
+ JmsIO.Read spec =
+ JmsIO.read()
+ .withConnectionFactory(connectionFactory)
+ .withUsername(USERNAME)
+ .withPassword(PASSWORD)
+ .withQueue(QUEUE)
+ .withCloseTimeout(closeTimeout);
+
+ JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
+
+ ExecutorOptions options = createExecutorOptions();
+
+ JmsIO.UnboundedJmsReader reader = source.createReader(options, null);
+
+ reader.start();
+ reader.close();
+
+ boolean discarded = getDiscardedValue(reader);
+ assertFalse(discarded);
+ try {
+ options.getScheduledExecutorService().awaitTermination(waitTimeout,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignored) {
+ }
+ discarded = getDiscardedValue(reader);
+ assertTrue(discarded);
Review Comment:
```suggestion
assertTrue(getDiscardedValue(reader));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]