Repository: incubator-samza Updated Branches: refs/heads/0.8.0 f484a7076 -> 5862f1391
SAMZA-445; close checkpoint manager in util after changelog partitions are written Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/5862f139 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/5862f139 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/5862f139 Branch: refs/heads/0.8.0 Commit: 5862f13911891f5566417622713b5e87057261dd Parents: f484a70 Author: Chris Riccomini <[email protected]> Authored: Fri Oct 24 14:38:29 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Fri Oct 24 14:38:29 2014 -0700 ---------------------------------------------------------------------- samza-core/src/main/scala/org/apache/samza/util/Util.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/5862f139/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 7d50352..1c7680f 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -281,8 +281,6 @@ object Util extends Logging { fromCM.map(kv => kv._1 -> kv._2.intValue()).toMap // Java to Scala interop!!! } - checkpointManager.stop - val newMapping = Util.resolveTaskNameToChangelogPartitionMapping(currentTaskNames, previousMapping) if (newMapping != null) { @@ -290,6 +288,8 @@ object Util extends Logging { checkpointManager.writeChangeLogPartitionMapping(newMapping.map(kv => kv._1 -> new java.lang.Integer(kv._2))) //Java to Scala interop!!! } + checkpointManager.stop + newMapping } }
