Repository: beam Updated Branches: refs/heads/master 4884d4867 -> 2df9dbd24
[BEAM-2246] Use CLIENT_ACK instead of AUTO_ACK in JmsIO Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a158fc17 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a158fc17 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a158fc17 Branch: refs/heads/master Commit: a158fc178e1297f04f4f18975383ec1dc69bc0d8 Parents: 4884d48 Author: Jean-Baptiste Onofré <[email protected]> Authored: Wed May 10 07:39:56 2017 +0200 Committer: Jean-Baptiste Onofré <[email protected]> Committed: Wed May 31 21:24:22 2017 +0200 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 5 +- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 78 ++++++++++++++++++++ 2 files changed, 81 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a158fc17/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index b8355ad..c5e5150 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -379,7 +379,8 @@ public class JmsIO { } - private static class UnboundedJmsReader extends UnboundedReader<JmsRecord> { + @VisibleForTesting + static class UnboundedJmsReader extends UnboundedReader<JmsRecord> { private UnboundedJmsSource source; private JmsCheckpointMark checkpointMark; @@ -421,7 +422,7 @@ public class JmsIO { } try { - this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + this.session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); } catch (Exception e) { throw new IOException("Error creating JMS session", e); } http://git-wip-us.apache.org/repos/asf/beam/blob/a158fc17/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index 7edda1a..43c050e 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -23,10 +23,12 @@ import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; +import java.util.Enumeration; import java.util.List; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -34,6 +36,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; @@ -71,6 +74,7 @@ public class JmsIOTest { private BrokerService broker; private ConnectionFactory connectionFactory; + private ConnectionFactory connectionFactoryWithoutPrefetch; @Rule public final transient TestPipeline pipeline = TestPipeline.create(); @@ -98,6 +102,8 @@ public class JmsIOTest { // create JMS connection factory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL); + connectionFactoryWithoutPrefetch = + new ActiveMQConnectionFactory(BROKER_URL + "?jms.prefetchPolicy.all=0"); } @After @@ -236,4 +242,76 @@ public class JmsIOTest { assertEquals(1, splits.size()); } + @Test + public void testCheckpointMark() throws Exception { + // we are using no prefetch here + // prefetch is an ActiveMQ feature: to make efficient use of network resources the broker + // utilizes a 'push' model to dispatch messages to consumers. However, in the case of our + // test, it means that we can have some latency between the receiveNoWait() method used by + // the consumer and the prefetch buffer populated by the broker. Using a prefetch to 0 means + // that the consumer will poll for message, which is exactly what we want for the test. + Connection connection = connectionFactoryWithoutPrefetch.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE)); + for (int i = 0; i < 10; i++) { + producer.send(session.createTextMessage("test " + i)); + } + producer.close(); + session.close(); + connection.close(); + + JmsIO.Read spec = JmsIO.read() + .withConnectionFactory(connectionFactoryWithoutPrefetch) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withQueue(QUEUE); + JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec); + JmsIO.UnboundedJmsReader reader = source.createReader(null, null); + + // start the reader and move to the first record + assertTrue(reader.start()); + + // consume 3 messages (NB: start already consumed the first message) + for (int i = 0; i < 3; i++) { + assertTrue(reader.advance()); + } + + // the messages are still pending in the queue (no ACK yet) + assertEquals(10, count(QUEUE)); + + // we finalize the checkpoint + reader.getCheckpointMark().finalizeCheckpoint(); + + // the checkpoint finalize ack the messages, and so they are not pending in the queue anymore + assertEquals(6, count(QUEUE)); + + // we read the 6 pending messages + for (int i = 0; i < 6; i++) { + assertTrue(reader.advance()); + } + + // still 6 pending messages as we didn't finalize the checkpoint + assertEquals(6, count(QUEUE)); + + // we finalize the checkpoint: no more message in the queue + reader.getCheckpointMark().finalizeCheckpoint(); + + assertEquals(0, count(QUEUE)); + } + + private int count(String queue) throws Exception { + Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + QueueBrowser browser = session.createBrowser(session.createQueue(queue)); + Enumeration<Message> messages = browser.getEnumeration(); + int count = 0; + while (messages.hasMoreElements()) { + messages.nextElement(); + count++; + } + return count; + } + }
