Repository: samza Updated Branches: refs/heads/master 1d2f054ab -> 5587ebfec
SAMZA-1796: PassthroughJobCoordinator doesn't create changelog streams Currently only the ClusterBasedJobCoordinator and ZkJobCoordinator are creating changelog streams. The Passthrough one should also do it. Author: xinyuiscool <[email protected]> Reviewers: Bharath K <[email protected]> Closes #595 from xinyuiscool/SAMZA-1796 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5587ebfe Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5587ebfe Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5587ebfe Branch: refs/heads/master Commit: 5587ebfececf27c3a1d38f3b63c8a1072e10816d Parents: 1d2f054 Author: xinyuiscool <[email protected]> Authored: Wed Aug 1 16:12:06 2018 -0700 Committer: xiliu <[email protected]> Committed: Wed Aug 1 16:12:06 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/standalone/PassthroughJobCoordinator.java | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5587ebfe/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java index 228617a..737ac3e 100644 --- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java @@ -29,6 +29,7 @@ import org.apache.samza.coordinator.JobModelManager; import org.apache.samza.job.model.JobModel; import org.apache.samza.coordinator.JobCoordinatorListener; import org.apache.samza.runtime.ProcessorIdGenerator; +import org.apache.samza.storage.ChangelogStreamManager; import org.apache.samza.system.StreamMetadataCache; import org.apache.samza.system.SystemAdmins; import org.apache.samza.util.*; @@ -81,6 +82,8 @@ public class PassthroughJobCoordinator implements JobCoordinator { if (checkpointManager != null) { checkpointManager.createResources(); } + + ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions); } catch (Exception e) { LOGGER.error("Exception while trying to getJobModel.", e); if (coordinatorListener != null) {
