Repository: beam Updated Branches: refs/heads/master 8ee3572b4 -> cef31093f
[BEAM-1127] Create an unique source when using a JMS topic to avoid elements duplication Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eed4efc5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eed4efc5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eed4efc5 Branch: refs/heads/master Commit: eed4efc537e5214d037665561c0e901931929c29 Parents: 8ee3572 Author: Jean-Baptiste Onofré <[email protected]> Authored: Thu Feb 2 16:17:25 2017 +0100 Committer: Dan Halperin <[email protected]> Committed: Fri Feb 3 03:28:33 2017 -0800 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/io/jms/JmsIO.java | 15 +++++++- .../org/apache/beam/sdk/io/jms/JmsIOTest.java | 39 ++++++++++++++++++-- 2 files changed, 49 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/eed4efc5/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java index c1f1cb4..270fe31 100644 --- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java +++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java @@ -325,7 +325,11 @@ public class JmsIO { private JmsIO() {} - private static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark> { + /** + * An unbounded JMS source. + */ + @VisibleForTesting + protected static class UnboundedJmsSource extends UnboundedSource<JmsRecord, JmsCheckpointMark> { private final Read spec; @@ -337,8 +341,15 @@ public class JmsIO { public List<UnboundedJmsSource> generateInitialSplits( int desiredNumSplits, PipelineOptions options) throws Exception { List<UnboundedJmsSource> sources = new ArrayList<>(); - for (int i = 0; i < desiredNumSplits; i++) { + if (spec.getTopic() != null) { + // in the case of a topic, we create a single source, so an unique subscriber, to avoid + // element duplication sources.add(new UnboundedJmsSource(spec)); + } else { + // in the case of a queue, we allow concurrent consumers + for (int i = 0; i < desiredNumSplits; i++) { + sources.add(new UnboundedJmsSource(spec)); + } } return sources; } http://git-wip-us.apache.org/repos/asf/beam/blob/eed4efc5/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java index c756cd0..a06bba3 100644 --- a/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java +++ b/sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsIOTest.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.io.jms; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + import java.util.ArrayList; import java.util.List; @@ -33,6 +36,8 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.security.AuthenticationUser; import org.apache.activemq.security.SimpleAuthenticationPlugin; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -40,7 +45,6 @@ import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PCollection; import org.junit.After; -import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -60,6 +64,7 @@ public class JmsIOTest { private static final String USERNAME = "test_user"; private static final String PASSWORD = "test_password"; private static final String QUEUE = "test_queue"; + private static final String TOPIC = "test_topic"; private BrokerService broker; private ConnectionFactory connectionFactory; @@ -167,7 +172,7 @@ public class JmsIOTest { session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE)); Message msg = consumer.receiveNoWait(); - Assert.assertNull(msg); + assertNull(msg); } @Test @@ -195,7 +200,35 @@ public class JmsIOTest { while (consumer.receive(1000) != null) { count++; } - Assert.assertEquals(100, count); + assertEquals(100, count); + } + + @Test + public void testSplitForQueue() throws Exception { + JmsIO.Read read = JmsIO.read().withQueue(QUEUE); + PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); + int desiredNumSplits = 5; + JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read); + List<JmsIO.UnboundedJmsSource> splits = initialSource.generateInitialSplits(desiredNumSplits, + pipelineOptions); + // in the case of a queue, we have concurrent consumers by default, so the initial number + // splits is equal to the desired number of splits + assertEquals(desiredNumSplits, splits.size()); + } + + @Test + public void testSplitForTopic() throws Exception { + JmsIO.Read read = JmsIO.read().withTopic(TOPIC); + PipelineOptions pipelineOptions = PipelineOptionsFactory.create(); + int desiredNumSplits = 5; + JmsIO.UnboundedJmsSource initialSource = new JmsIO.UnboundedJmsSource(read); + List<JmsIO.UnboundedJmsSource> splits = initialSource.generateInitialSplits(desiredNumSplits, + pipelineOptions); + // in the case of a topic, we can have only an unique subscriber on the topic per pipeline + // else it means we can have duplicate messages (all subscribers on the topic receive every + // message). + // So, whatever the desizedNumSplits is, the actual number of splits should be 1. + assertEquals(1, splits.size()); } }
