Repository: beam Updated Branches: refs/heads/master 6e220bb37 -> e5afbb27f
[BEAM-1392] DoFn teardown not called on empty partitions Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25f91353 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25f91353 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25f91353 Branch: refs/heads/master Commit: 25f913536019aff80d189cf7136f2acc3b6db7c2 Parents: 6e220bb Author: Aviem Zur <[email protected]> Authored: Sun Feb 5 11:22:00 2017 +0200 Committer: Sela <[email protected]> Committed: Sun Feb 5 11:35:31 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/spark/translation/SparkProcessContext.java | 2 ++ 1 file changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/25f91353/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 9957bf3..60c9d4d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -62,8 +62,10 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> { // skip if partition is empty. if (!partition.hasNext()) { + DoFnInvokers.invokerFor(doFn).invokeTeardown(); return Lists.newArrayList(); } + // call startBundle() before beginning to process the partition. doFnRunner.startBundle(); // process the partition; finishBundle() is called from within the output iterator.
