SAMZA-789: avoid re-register coordinator stream partition after the CoordinatorStreamSystemConsumer has started
RB=583289 G=samza-reviewers A=jmaes Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9e1906af Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9e1906af Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9e1906af Branch: refs/heads/samza-sql Commit: 9e1906af377b6631842894cfcdfd8c90a18676ca Parents: b14da28 Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Tue Oct 6 18:13:44 2015 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Oct 8 23:22:04 2015 -0700 ---------------------------------------------------------------------- .../stream/CoordinatorStreamSystemConsumer.java | 4 +++ .../TestCoordinatorStreamSystemConsumer.java | 29 +++++++++++++++----- 2 files changed, 26 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/9e1906af/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java index 3113f09..e1a7626 100644 --- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java +++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java @@ -81,6 +81,10 @@ public class CoordinatorStreamSystemConsumer { * coordinator stream with the SystemConsumer using the earliest offset. */ public void register() { + if (isStarted) { + log.info("Coordinator stream partition {} has already been registered. Skipping.", coordinatorSystemStreamPartition); + return; + } log.debug("Attempting to register: {}", coordinatorSystemStreamPartition); Set<String> streamNames = new HashSet<String>(); String streamName = coordinatorSystemStreamPartition.getStream(); http://git-wip-us.apache.org/repos/asf/samza/blob/9e1906af/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java index 370cfb7..0e73e18 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java @@ -52,9 +52,9 @@ public class TestCoordinatorStreamSystemConsumer { SystemStream systemStream = new SystemStream("system", "stream"); MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0))); CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin()); - assertFalse(systemConsumer.isRegistered()); + assertEquals(0, systemConsumer.getRegisterCount()); consumer.register(); - assertTrue(systemConsumer.isRegistered()); + assertEquals(1, systemConsumer.getRegisterCount()); assertFalse(systemConsumer.isStarted()); consumer.start(); assertTrue(systemConsumer.isStarted()); @@ -72,6 +72,23 @@ public class TestCoordinatorStreamSystemConsumer { assertTrue(systemConsumer.isStopped()); } + @Test + public void testCoordinatorStreamSystemConsumerRegisterOnceOnly() throws Exception { + Map<String, String> expectedConfig = new LinkedHashMap<String, String>(); + expectedConfig.put("job.id", "1234"); + SystemStream systemStream = new SystemStream("system", "stream"); + MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0))); + CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin()); + assertEquals(0, systemConsumer.getRegisterCount()); + consumer.register(); + assertEquals(1, systemConsumer.getRegisterCount()); + assertFalse(systemConsumer.isStarted()); + consumer.start(); + assertTrue(systemConsumer.isStarted()); + consumer.register(); + assertEquals(1, systemConsumer.getRegisterCount()); + } + private boolean testOrder(Set<CoordinatorStreamMessage> bootstrappedStreamSet) { int initialSize = bootstrappedStreamSet.size(); List<CoordinatorStreamMessage> listStreamMessages = new ArrayList<CoordinatorStreamMessage>(); @@ -96,7 +113,7 @@ public class TestCoordinatorStreamSystemConsumer { private static class MockSystemConsumer implements SystemConsumer { private boolean started = false; private boolean stopped = false; - private boolean registered = false; + private int registerCount = 0; private final SystemStreamPartition expectedSystemStreamPartition; private int pollCount = 0; @@ -113,13 +130,11 @@ public class TestCoordinatorStreamSystemConsumer { } public void register(SystemStreamPartition systemStreamPartition, String offset) { - registered = true; + registerCount++; assertEquals(expectedSystemStreamPartition, systemStreamPartition); } - public boolean isRegistered() { - return registered; - } + public int getRegisterCount() { return registerCount; } public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException { Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
