Repository: samza Updated Branches: refs/heads/master 81b173246 -> ad1f16175
SAMZA-1248 - Fix StandAlone barrier start list. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ad1f1617 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ad1f1617 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ad1f1617 Branch: refs/heads/master Commit: ad1f161751bbe69220dd2bb0da64be3e2f64d674 Parents: 81b1732 Author: Boris Shkolnik <[email protected]> Authored: Mon May 1 18:32:48 2017 -0700 Committer: nramesh <[email protected]> Committed: Mon May 1 18:32:48 2017 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java | 5 ++--- samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java | 2 +- .../src/test/java/org/apache/samza/zk/TestZkUtils.java | 8 ++------ 3 files changed, 5 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ad1f1617/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 2535654..1ddedbc 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -63,8 +63,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private final CoordinationUtils coordinationUtils; private JobModel newJobModel; - private JobModel jobModel; - + public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer, SamzaContainerController containerController) { this.debounceTimer = debounceTimer; @@ -215,7 +214,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { } log.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray())); - jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, + JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, containerIds); log.info("pid=" + processorId + "Generated jobModel: " + jobModel); http://git-wip-us.apache.org/repos/asf/samza/blob/ad1f1617/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index fee8405..be877a4 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -169,7 +169,7 @@ public class ZkUtils { for (String child : znodeIds) { String fullPath = String.format("%s/%s", processorPath, child); - processorIds.add(readProcessorData(fullPath)); + processorIds.add(new ProcessorData(readProcessorData(fullPath)).getProcessorId()); } LOG.info("Found these children - " + znodeIds); http://git-wip-us.apache.org/repos/asf/samza/blob/ad1f1617/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index b8dc295..63e2361 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -114,12 +114,8 @@ public class TestZkUtils { l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(2, l.size()); - ProcessorData pd = new ProcessorData(l.get(0)); - Assert.assertEquals(" ID1 didn't match", "1", pd.getProcessorId()); - Assert.assertEquals(" Host1 didn't match", "host1", pd.getHost()); - pd = new ProcessorData(l.get(1)); - Assert.assertEquals(" ID2 didn't match", "2", pd.getProcessorId()); - Assert.assertEquals(" Host2 didn't match", "host2", pd.getHost()); + Assert.assertEquals(" ID1 didn't match", "1", l.get(0)); + Assert.assertEquals(" ID2 didn't match", "2", l.get(1)); } @Test
