Repository: samza
Updated Branches:
  refs/heads/master 6cd76a5d5 -> 3d1fd2171


SAMZA-728 : Samza job fails due to null pointer in JobCoordinator 
refreshJobModel


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3d1fd217
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3d1fd217
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3d1fd217

Branch: refs/heads/master
Commit: 3d1fd2171e5f426c15fff457d61d952860b3298e
Parents: 6cd76a5
Author: Navina <[email protected]>
Authored: Wed Jul 15 10:38:29 2015 -0700
Committer: Navina <[email protected]>
Committed: Wed Jul 15 10:38:29 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/coordinator/JobCoordinator.scala     | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3d1fd217/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 8ee034a..73c58a7 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -247,7 +247,15 @@ object JobCoordinator extends Logging {
                 { case (taskName, systemStreamPartitions) =>
                   val checkpoint = 
Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new 
Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
                   // Find the system partitions which don't have a checkpoint 
and set null for the values for offsets
-                  val offsetMap = systemStreamPartitions.map(ssp => (ssp -> 
null)).toMap ++ checkpoint.getOffsets
+                  val taskOffsets = checkpoint.getOffsets
+                  val offsetMap = new util.HashMap[SystemStreamPartition, 
String]()
+                  systemStreamPartitions.foreach {
+                    ssp =>
+                      if(taskOffsets.containsKey(ssp))
+                        offsetMap.put(ssp, taskOffsets.get(ssp))
+                      else
+                        offsetMap.put(ssp, null)
+                  }
                   val changelogPartition = 
Option(previousChangelogMapping.get(taskName)) match
                   {
                     case Some(changelogPartitionId) => new 
Partition(changelogPartitionId)

Reply via email to