Repository: beam Updated Branches: refs/heads/master f9d51aa5c -> 86e04893a
Reverse removal of NativeWrite evaluator in Python DirectRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/809f1787 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/809f1787 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/809f1787 Branch: refs/heads/master Commit: 809f17876d847002ba76979cb3362451fa01c110 Parents: f9d51aa Author: Charles Chen <[email protected]> Authored: Mon Jun 12 14:17:50 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Mon Jun 12 15:43:31 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/transform_evaluator.py | 62 +++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/809f1787/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 0fec8b8..b1cb626 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -29,6 +29,7 @@ from apache_beam.runners.common import DoFnRunner from apache_beam.runners.common import DoFnState from apache_beam.runners.direct.watermark_manager import WatermarkManager from apache_beam.runners.direct.transform_result import TransformResult +from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access from apache_beam.transforms import core from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue @@ -53,6 +54,7 @@ class TransformEvaluatorRegistry(object): core.Flatten: _FlattenEvaluator, core.ParDo: _ParDoEvaluator, core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator, + _NativeWrite: _NativeWriteEvaluator, } def for_application( @@ -96,7 +98,8 @@ class TransformEvaluatorRegistry(object): Returns: True if executor should execute applied_ptransform serially. """ - return isinstance(applied_ptransform.transform, core._GroupByKeyOnly) + return isinstance(applied_ptransform.transform, + (core._GroupByKeyOnly, _NativeWrite)) class _TransformEvaluator(object): @@ -400,3 +403,60 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): return TransformResult( self._applied_ptransform, bundles, state, None, None, hold) + + +class _NativeWriteEvaluator(_TransformEvaluator): + """TransformEvaluator for _NativeWrite transform.""" + + def __init__(self, evaluation_context, applied_ptransform, + input_committed_bundle, side_inputs, scoped_metrics_container): + assert not side_inputs + super(_NativeWriteEvaluator, self).__init__( + evaluation_context, applied_ptransform, input_committed_bundle, + side_inputs, scoped_metrics_container) + + assert applied_ptransform.transform.sink + self._sink = applied_ptransform.transform.sink + + @property + def _is_final_bundle(self): + return (self._execution_context.watermarks.input_watermark + == WatermarkManager.WATERMARK_POS_INF) + + @property + def _has_already_produced_output(self): + return (self._execution_context.watermarks.output_watermark + == WatermarkManager.WATERMARK_POS_INF) + + def start_bundle(self): + # state: [values] + self.state = (self._execution_context.existing_state + if self._execution_context.existing_state else []) + + def process_element(self, element): + self.state.append(element) + + def finish_bundle(self): + # finish_bundle will append incoming bundles in memory until all the bundles + # carrying data is processed. This is done to produce only a single output + # shard (some tests depends on this behavior). It is possible to have + # incoming empty bundles after the output is produced, these bundles will be + # ignored and would not generate additional output files. + # TODO(altay): Do not wait until the last bundle to write in a single shard. + if self._is_final_bundle: + if self._has_already_produced_output: + # Ignore empty bundles that arrive after the output is produced. + assert self.state == [] + else: + self._sink.pipeline_options = self._evaluation_context.pipeline_options + with self._sink.writer() as writer: + for v in self.state: + writer.Write(v.value) + state = None + hold = WatermarkManager.WATERMARK_POS_INF + else: + state = self.state + hold = WatermarkManager.WATERMARK_NEG_INF + + return TransformResult( + self._applied_ptransform, [], state, None, None, hold)
