Repository: samza
Updated Branches:
  refs/heads/master 5fab34e52 -> 920f803a2


SAMZA-971 : JobCoordinator/JobModelManager does not need to fetch offset for 
all stream partitions


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/920f803a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/920f803a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/920f803a

Branch: refs/heads/master
Commit: 920f803a2e3dab809f4d7bb518259b0f4164407f
Parents: 5fab34e
Author: Navina Ramesh <[email protected]>
Authored: Fri Jun 24 17:50:33 2016 -0700
Committer: Navina Ramesh <[email protected]>
Committed: Fri Jun 24 17:50:33 2016 -0700

----------------------------------------------------------------------
 .../org/apache/samza/coordinator/JobCoordinator.scala   | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/920f803a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 0aee4ce..d3bd9b7 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -87,7 +87,7 @@ object JobModelManager extends Logging {
       systemName -> systemFactory.getAdmin(systemName, config)
     }).toMap
 
-    val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+    val streamMetadataCache = new StreamMetadataCache(systemAdmins = 
systemAdmins, cacheTTLms = 0)
     var streamPartitionCountMonitor: StreamPartitionCountMonitor = null
     if (config.getMonitorPartitionChange) {
       val extendedSystemAdmins = systemAdmins.filter{
@@ -104,7 +104,7 @@ object JobModelManager extends Logging {
     }
 
     val jobCoordinator = getJobCoordinator(config, changelogManager, 
localityManager, streamMetadataCache, streamPartitionCountMonitor)
-    createChangeLogStreams(config, 
jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
+    createChangeLogStreams(config, 
jobCoordinator.jobModel.maxChangeLogStreamPartitions)
 
     jobCoordinator
   }
@@ -137,7 +137,7 @@ object JobModelManager extends Logging {
 
     // Get the set of partitions for each SystemStream from the stream metadata
     streamMetadataCache
-      .getStreamMetadata(inputSystemStreams)
+      .getStreamMetadata(inputSystemStreams, true)
       .flatMap {
         case (systemStream, metadata) =>
           metadata
@@ -286,7 +286,7 @@ object JobModelManager extends Logging {
     new JobModel(config, containerMap, localityManager)
   }
 
-  private def createChangeLogStreams(config: StorageConfig, 
changeLogPartitions: Int, streamMetadataCache: StreamMetadataCache) {
+  private def createChangeLogStreams(config: StorageConfig, 
changeLogPartitions: Int) {
     val changeLogSystemStreams = config
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)
@@ -301,10 +301,6 @@ object JobModelManager extends Logging {
 
       systemAdmin.createChangelogStream(systemStream.getStream, 
changeLogPartitions)
     }
-
-    val changeLogMetadata = 
streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet)
-
-    info("Got change log stream metadata: %s" format changeLogMetadata)
   }
 
   private def getSystemNames(config: Config) = config.getSystemNames.toSet

Reply via email to