Repository: incubator-beam Updated Branches: refs/heads/python-sdk b4187bd91 -> 3b5cd0efc
Fix the flaky test_model_multiple_pcollections_partition test _NativeWriteEvaluator should ignore empty bundles that arrive after a write. Write happens once the last bundle containing data is processed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/102e6773 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/102e6773 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/102e6773 Branch: refs/heads/python-sdk Commit: 102e677375869386cc927ba649fddf6736455307 Parents: b83f12b Author: Ahmet Altay <al...@google.com> Authored: Thu Nov 17 17:48:13 2016 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Thu Nov 17 17:48:13 2016 -0800 ---------------------------------------------------------------------- .../runners/direct/transform_evaluator.py | 27 +++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/102e6773/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 093f183..5a79ab2 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -513,6 +513,11 @@ class _NativeWriteEvaluator(_TransformEvaluator): return (self._execution_context.watermarks.input_watermark == WatermarkManager.WATERMARK_POS_INF) + @property + def _has_already_produced_output(self): + return (self._execution_context.watermarks.output_watermark + == WatermarkManager.WATERMARK_POS_INF) + def start_bundle(self): # state: [values] self.state = (self._execution_context.existing_state @@ -524,15 +529,19 @@ class _NativeWriteEvaluator(_TransformEvaluator): def finish_bundle(self): # TODO(altay): Do not wait until the last bundle to write in a single shard. if self._is_final_bundle: - if isinstance(self._sink, io.fileio.NativeTextFileSink): - assert self._sink.num_shards in (0, 1) - if self._sink.shard_name_template: - self._sink.file_path += '-00000-of-00001' - self._sink.file_path += self._sink.file_name_suffix - self._sink.pipeline_options = self._evaluation_context.pipeline_options - with self._sink.writer() as writer: - for v in self.state: - writer.Write(v.value) + if not self._has_already_produced_output: + if isinstance(self._sink, io.fileio.NativeTextFileSink): + assert self._sink.num_shards in (0, 1) + if self._sink.shard_name_template: + self._sink.file_path += '-00000-of-00001' + self._sink.file_path += self._sink.file_name_suffix + self._sink.pipeline_options = self._evaluation_context.pipeline_options + with self._sink.writer() as writer: + for v in self.state: + writer.Write(v.value) + else: + # Ignore empty bundles that arrive after the output is produced. + assert self.state == [] state = None hold = WatermarkManager.WATERMARK_POS_INF