Repository: samza Updated Branches: refs/heads/master 5fab34e52 -> 920f803a2
SAMZA-971 : JobCoordinator/JobModelManager does not need to fetch offset for all stream partitions Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/920f803a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/920f803a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/920f803a Branch: refs/heads/master Commit: 920f803a2e3dab809f4d7bb518259b0f4164407f Parents: 5fab34e Author: Navina Ramesh <[email protected]> Authored: Fri Jun 24 17:50:33 2016 -0700 Committer: Navina Ramesh <[email protected]> Committed: Fri Jun 24 17:50:33 2016 -0700 ---------------------------------------------------------------------- .../org/apache/samza/coordinator/JobCoordinator.scala | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/920f803a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index 0aee4ce..d3bd9b7 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -87,7 +87,7 @@ object JobModelManager extends Logging { systemName -> systemFactory.getAdmin(systemName, config) }).toMap - val streamMetadataCache = new StreamMetadataCache(systemAdmins) + val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0) var streamPartitionCountMonitor: StreamPartitionCountMonitor = null if (config.getMonitorPartitionChange) { val extendedSystemAdmins = systemAdmins.filter{ @@ -104,7 +104,7 @@ object JobModelManager extends Logging { } val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache, streamPartitionCountMonitor) - createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache) + createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions) jobCoordinator } @@ -137,7 +137,7 @@ object JobModelManager extends Logging { // Get the set of partitions for each SystemStream from the stream metadata streamMetadataCache - .getStreamMetadata(inputSystemStreams) + .getStreamMetadata(inputSystemStreams, true) .flatMap { case (systemStream, metadata) => metadata @@ -286,7 +286,7 @@ object JobModelManager extends Logging { new JobModel(config, containerMap, localityManager) } - private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int, streamMetadataCache: StreamMetadataCache) { + private def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) { val changeLogSystemStreams = config .getStoreNames .filter(config.getChangelogStream(_).isDefined) @@ -301,10 +301,6 @@ object JobModelManager extends Logging { systemAdmin.createChangelogStream(systemStream.getStream, changeLogPartitions) } - - val changeLogMetadata = streamMetadataCache.getStreamMetadata(changeLogSystemStreams.values.toSet) - - info("Got change log stream metadata: %s" format changeLogMetadata) } private def getSystemNames(config: Config) = config.getSystemNames.toSet
