Repository: samza
Updated Branches:
  refs/heads/master 1d2f054ab -> 5587ebfec


SAMZA-1796: PassthroughJobCoordinator doesn't create changelog streams

Currently only the ClusterBasedJobCoordinator and ZkJobCoordinator are creating 
changelog streams. The Passthrough one should also do it.

Author: xinyuiscool <[email protected]>

Reviewers: Bharath K <[email protected]>

Closes #595 from xinyuiscool/SAMZA-1796


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5587ebfe
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5587ebfe
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5587ebfe

Branch: refs/heads/master
Commit: 5587ebfececf27c3a1d38f3b63c8a1072e10816d
Parents: 1d2f054
Author: xinyuiscool <[email protected]>
Authored: Wed Aug 1 16:12:06 2018 -0700
Committer: xiliu <[email protected]>
Committed: Wed Aug 1 16:12:06 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/standalone/PassthroughJobCoordinator.java    | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5587ebfe/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 228617a..737ac3e 100644
--- 
a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -29,6 +29,7 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.util.*;
@@ -81,6 +82,8 @@ public class PassthroughJobCoordinator implements 
JobCoordinator {
       if (checkpointManager != null) {
         checkpointManager.createResources();
       }
+
+      ChangelogStreamManager.createChangelogStreams(config, 
jobModel.maxChangeLogStreamPartitions);
     } catch (Exception e) {
       LOGGER.error("Exception while trying to getJobModel.", e);
       if (coordinatorListener != null) {

Reply via email to