Repository: samza Updated Branches: refs/heads/master 6cd76a5d5 -> 3d1fd2171
SAMZA-728 : Samza job fails due to null pointer in JobCoordinator refreshJobModel Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3d1fd217 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3d1fd217 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3d1fd217 Branch: refs/heads/master Commit: 3d1fd2171e5f426c15fff457d61d952860b3298e Parents: 6cd76a5 Author: Navina <[email protected]> Authored: Wed Jul 15 10:38:29 2015 -0700 Committer: Navina <[email protected]> Committed: Wed Jul 15 10:38:29 2015 -0700 ---------------------------------------------------------------------- .../org/apache/samza/coordinator/JobCoordinator.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3d1fd217/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala index 8ee034a..73c58a7 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala @@ -247,7 +247,15 @@ object JobCoordinator extends Logging { { case (taskName, systemStreamPartitions) => val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]())) // Find the system partitions which don't have a checkpoint and set null for the values for offsets - val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets + val taskOffsets = checkpoint.getOffsets + val offsetMap = new util.HashMap[SystemStreamPartition, String]() + systemStreamPartitions.foreach { + ssp => + if(taskOffsets.containsKey(ssp)) + offsetMap.put(ssp, taskOffsets.get(ssp)) + else + offsetMap.put(ssp, null) + } val changelogPartition = Option(previousChangelogMapping.get(taskName)) match { case Some(changelogPartitionId) => new Partition(changelogPartitionId)
