Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b4187bd91 -> 3b5cd0efc


Fix the flaky test_model_multiple_pcollections_partition test

_NativeWriteEvaluator should ignore empty bundles that arrive after a
write. Write happens once the last bundle containing data is processed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/102e6773
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/102e6773
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/102e6773

Branch: refs/heads/python-sdk
Commit: 102e677375869386cc927ba649fddf6736455307
Parents: b83f12b
Author: Ahmet Altay <al...@google.com>
Authored: Thu Nov 17 17:48:13 2016 -0800
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Nov 17 17:48:13 2016 -0800

----------------------------------------------------------------------
 .../runners/direct/transform_evaluator.py       | 27 +++++++++++++-------
 1 file changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/102e6773/sdks/python/apache_beam/runners/direct/transform_evaluator.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 093f183..5a79ab2 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -513,6 +513,11 @@ class _NativeWriteEvaluator(_TransformEvaluator):
     return (self._execution_context.watermarks.input_watermark
             == WatermarkManager.WATERMARK_POS_INF)
 
+  @property
+  def _has_already_produced_output(self):
+    return (self._execution_context.watermarks.output_watermark
+            == WatermarkManager.WATERMARK_POS_INF)
+
   def start_bundle(self):
     # state: [values]
     self.state = (self._execution_context.existing_state
@@ -524,15 +529,19 @@ class _NativeWriteEvaluator(_TransformEvaluator):
   def finish_bundle(self):
     # TODO(altay): Do not wait until the last bundle to write in a single 
shard.
     if self._is_final_bundle:
-      if isinstance(self._sink, io.fileio.NativeTextFileSink):
-        assert self._sink.num_shards in (0, 1)
-        if self._sink.shard_name_template:
-          self._sink.file_path += '-00000-of-00001'
-          self._sink.file_path += self._sink.file_name_suffix
-      self._sink.pipeline_options = self._evaluation_context.pipeline_options
-      with self._sink.writer() as writer:
-        for v in self.state:
-          writer.Write(v.value)
+      if not self._has_already_produced_output:
+        if isinstance(self._sink, io.fileio.NativeTextFileSink):
+          assert self._sink.num_shards in (0, 1)
+          if self._sink.shard_name_template:
+            self._sink.file_path += '-00000-of-00001'
+            self._sink.file_path += self._sink.file_name_suffix
+        self._sink.pipeline_options = self._evaluation_context.pipeline_options
+        with self._sink.writer() as writer:
+          for v in self.state:
+            writer.Write(v.value)
+      else:
+        # Ignore empty bundles that arrive after the output is produced.
+        assert self.state == []
 
       state = None
       hold = WatermarkManager.WATERMARK_POS_INF

Reply via email to