Repository: samza Updated Branches: refs/heads/master 7a2e19250 -> a322972d0
SAMZA-1507; Create changelog streams in Leader(ZkJobCoordinator) for stateful operators. Verified with a test standalone job. Will add integration test for this as a part of fixing and reenabling standalone integration tests. Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Boris Shkolnik <[email protected]> Closes #362 from shanthoosh/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a322972d Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a322972d Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a322972d Branch: refs/heads/master Commit: a322972d00474082917e6700c450dc1e5e98d7dc Parents: 7a2e192 Author: Shanthoosh Venkataraman <[email protected]> Authored: Wed Nov 22 11:54:10 2017 -0800 Committer: Boris S <[email protected]> Committed: Wed Nov 22 11:54:10 2017 -0800 ---------------------------------------------------------------------- .../org/apache/samza/zk/ZkJobCoordinator.java | 30 ++++++++++++++++++-- .../main/java/org/apache/samza/zk/ZkUtils.java | 2 +- .../samza/coordinator/JobModelManager.scala | 2 +- 3 files changed, 29 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 9d44ec1..0509474 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -20,12 +20,13 @@ package org.apache.samza.zk; import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; +import java.util.HashMap; import java.util.List; import java.util.Map; -import org.I0Itec.zkclient.IZkStateListener; +import java.util.Objects; import java.util.Set; +import org.I0Itec.zkclient.IZkStateListener; import org.apache.commons.lang3.StringUtils; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; @@ -33,11 +34,14 @@ import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MetricsConfig; import org.apache.samza.config.ZkConfig; +import org.apache.samza.config.StorageConfig; +import org.apache.samza.container.TaskName; import org.apache.samza.coordinator.JobCoordinator; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.coordinator.LeaderElector; import org.apache.samza.coordinator.LeaderElectorListener; +import org.apache.samza.job.model.ContainerModel; import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.MetricsReporter; @@ -88,6 +92,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private JobCoordinatorListener coordinatorListener = null; private JobModel newJobModel; private int debounceTimeMs; + private boolean hasCreatedChangeLogStreams = false; + private String cachedJobModelVersion = null; + private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>(); ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) { this.config = config; @@ -188,6 +195,10 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { // Generate the JobModel JobModel jobModel = generateNewJobModel(currentProcessorIds); + if (!hasCreatedChangeLogStreams) { + JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions); + hasCreatedChangeLogStreams = true; + } // Assign the next version of JobModel String currentJMVersion = zkUtils.getJobModelVersion(); String nextJMVersion; @@ -279,7 +290,20 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { * Generate new JobModel when becoming a leader or the list of processor changed. */ private JobModel generateNewJobModel(List<String> processors) { - return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache, processors); + String zkJobModelVersion = zkUtils.getJobModelVersion(); + // If JobModel exists in zookeeper && cached JobModel version is unequal to JobModel version stored in zookeeper. + if (zkJobModelVersion != null && !Objects.equals(cachedJobModelVersion, zkJobModelVersion)) { + JobModel jobModel = zkUtils.getJobModel(zkJobModelVersion); + for (ContainerModel containerModel : jobModel.getContainers().values()) { + containerModel.getTasks().forEach((taskName, taskModel) -> changeLogPartitionMap.put(taskName, taskModel.getChangelogPartition().getPartitionId())); + } + cachedJobModelVersion = zkJobModelVersion; + } + /** + * Host affinity is not supported in standalone. Hence, LocalityManager(which is responsible for container + * to host mapping) is passed in as null when building the jobModel. + */ + return JobModelManager.readJobModel(this.config, changeLogPartitionMap, null, streamMetadataCache, processors); } class LeaderElectorListenerImpl implements LeaderElectorListener { http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 6ca9052..2f60d52 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -407,7 +407,7 @@ public class ZkUtils { * @return jobmodel version as a string */ public String getJobModelVersion() { - String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath()); + String jobModelVersion = zkClient.readData(keyBuilder.getJobModelVersionPath(), true); metrics.reads.inc(); return jobModelVersion; } http://git-wip-us.apache.org/repos/asf/samza/blob/a322972d/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index e915a8a..c2e0665 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -276,7 +276,7 @@ object JobModelManager extends Logging { systemAdmins } - private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) { + def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) { val changeLogSystemStreams = config .getStoreNames .filter(config.getChangelogStream(_).isDefined)
