Repository: beam Updated Branches: refs/heads/master dd9abc397 -> c96208347
Choose GroupAlsoByWindows implementation based on streaming flag Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b600d20 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b600d20 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b600d20 Branch: refs/heads/master Commit: 0b600d20de2cf2e6071d1d288d4b6a4795df710a Parents: dd9abc3 Author: Charles Chen <[email protected]> Authored: Wed Jun 7 16:09:10 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Wed Jun 14 16:47:20 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/options/pipeline_options.py | 9 -- .../apache_beam/runners/direct/direct_runner.py | 28 ++++++ sdks/python/apache_beam/transforms/core.py | 89 +++++++++++--------- 3 files changed, 79 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/options/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index daef3a7..8598e05 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -18,7 +18,6 @@ """Pipeline options obtained from command line parsing.""" import argparse -import warnings from apache_beam.transforms.display import HasDisplayData from apache_beam.options.value_provider import StaticValueProvider @@ -279,14 +278,6 @@ class StandardOptions(PipelineOptions): action='store_true', help='Whether to enable streaming mode.') - # TODO(BEAM-1265): Remove this warning, once at least one runner supports - # streaming pipelines. - def validate(self, validator): - errors = [] - if self.view_as(StandardOptions).streaming: - warnings.warn('Streaming pipelines are not supported.') - return errors - class TypeOptions(PipelineOptions): http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/runners/direct/direct_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 323f44b..d80ef10 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -26,19 +26,34 @@ from __future__ import absolute_import import collections import logging +from apache_beam import typehints from apache_beam.metrics.execution import MetricsEnvironment from apache_beam.runners.direct.bundle_factory import BundleFactory from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState from apache_beam.runners.runner import PValueCache +from apache_beam.transforms.core import _GroupAlsoByWindow from apache_beam.options.pipeline_options import DirectOptions +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.value_provider import RuntimeValueProvider __all__ = ['DirectRunner'] +# Type variables. +K = typehints.TypeVariable('K') +V = typehints.TypeVariable('V') + + [email protected]_input_types(typehints.KV[K, typehints.Iterable[V]]) [email protected]_output_types(typehints.KV[K, typehints.Iterable[V]]) +class _StreamingGroupAlsoByWindow(_GroupAlsoByWindow): + """Streaming GroupAlsoByWindow placeholder for overriding in DirectRunner.""" + pass + + class DirectRunner(PipelineRunner): """Executes a single pipeline on the local machine.""" @@ -64,6 +79,19 @@ class DirectRunner(PipelineRunner): except NotImplementedError: return transform.expand(pcoll) + def apply__GroupAlsoByWindow(self, transform, pcoll): + if (transform.__class__ == _GroupAlsoByWindow and + pcoll.pipeline._options.view_as(StandardOptions).streaming): + # Use specialized streaming implementation, if requested. + raise NotImplementedError( + 'Streaming support is not yet available on the DirectRunner.') + # TODO(ccy): enable when streaming implementation is plumbed through. + # type_hints = transform.get_type_hints() + # return pcoll | (_StreamingGroupAlsoByWindow(transform.windowing) + # .with_input_types(*type_hints.input_types[0]) + # .with_output_types(*type_hints.output_types[0])) + return transform.expand(pcoll) + def run(self, pipeline): """Execute the entire pipeline and returns an DirectPipelineResult.""" http://git-wip-us.apache.org/repos/asf/beam/blob/0b600d20/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index a137a13..c30136d 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -1078,40 +1078,6 @@ class GroupByKey(PTransform): key_type, value_type = trivial_inference.key_value_types(input_type) return Iterable[KV[key_type, typehints.WindowedValue[value_type]]] - class GroupAlsoByWindow(DoFn): - # TODO(robertwb): Support combiner lifting. - - def __init__(self, windowing): - super(GroupByKey.GroupAlsoByWindow, self).__init__() - self.windowing = windowing - - def infer_output_type(self, input_type): - key_type, windowed_value_iter_type = trivial_inference.key_value_types( - input_type) - value_type = windowed_value_iter_type.inner_type.inner_type - return Iterable[KV[key_type, Iterable[value_type]]] - - def start_bundle(self): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.transforms.trigger import InMemoryUnmergedState - from apache_beam.transforms.trigger import create_trigger_driver - # pylint: enable=wrong-import-order, wrong-import-position - self.driver = create_trigger_driver(self.windowing, True) - self.state_type = InMemoryUnmergedState - - def process(self, element): - k, vs = element - state = self.state_type() - # TODO(robertwb): Conditionally process in smaller chunks. - for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP): - yield wvalue.with_value((k, wvalue.value)) - while state.timers: - fired = state.get_and_clear_timers() - for timer_window, (name, time_domain, fire_time) in fired: - for wvalue in self.driver.process_timer( - timer_window, name, time_domain, fire_time, state): - yield wvalue.with_value((k, wvalue.value)) - def expand(self, pcoll): # This code path is only used in the local direct runner. For Dataflow # runner execution, the GroupByKey transform is expanded on the service. @@ -1136,8 +1102,7 @@ class GroupByKey(PTransform): | 'GroupByKey' >> (_GroupByKeyOnly() .with_input_types(reify_output_type) .with_output_types(gbk_input_type)) - | ('GroupByWindow' >> ParDo( - self.GroupAlsoByWindow(pcoll.windowing)) + | ('GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing) .with_input_types(gbk_input_type) .with_output_types(gbk_output_type))) else: @@ -1145,8 +1110,7 @@ class GroupByKey(PTransform): return (pcoll | 'ReifyWindows' >> ParDo(self.ReifyWindows()) | 'GroupByKey' >> _GroupByKeyOnly() - | 'GroupByWindow' >> ParDo( - self.GroupAlsoByWindow(pcoll.windowing))) + | 'GroupByWindow' >> _GroupAlsoByWindow(pcoll.windowing)) @typehints.with_input_types(typehints.KV[K, V]) @@ -1162,6 +1126,55 @@ class _GroupByKeyOnly(PTransform): return pvalue.PCollection(pcoll.pipeline) [email protected]_input_types(typehints.KV[K, typehints.Iterable[V]]) [email protected]_output_types(typehints.KV[K, typehints.Iterable[V]]) +class _GroupAlsoByWindow(ParDo): + """The GroupAlsoByWindow transform.""" + def __init__(self, windowing): + super(_GroupAlsoByWindow, self).__init__( + _GroupAlsoByWindowDoFn(windowing)) + self.windowing = windowing + + def expand(self, pcoll): + self._check_pcollection(pcoll) + return pvalue.PCollection(pcoll.pipeline) + + +class _GroupAlsoByWindowDoFn(DoFn): + # TODO(robertwb): Support combiner lifting. + + def __init__(self, windowing): + super(_GroupAlsoByWindowDoFn, self).__init__() + self.windowing = windowing + + def infer_output_type(self, input_type): + key_type, windowed_value_iter_type = trivial_inference.key_value_types( + input_type) + value_type = windowed_value_iter_type.inner_type.inner_type + return Iterable[KV[key_type, Iterable[value_type]]] + + def start_bundle(self): + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.transforms.trigger import InMemoryUnmergedState + from apache_beam.transforms.trigger import create_trigger_driver + # pylint: enable=wrong-import-order, wrong-import-position + self.driver = create_trigger_driver(self.windowing, True) + self.state_type = InMemoryUnmergedState + + def process(self, element): + k, vs = element + state = self.state_type() + # TODO(robertwb): Conditionally process in smaller chunks. + for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP): + yield wvalue.with_value((k, wvalue.value)) + while state.timers: + fired = state.get_and_clear_timers() + for timer_window, (name, time_domain, fire_time) in fired: + for wvalue in self.driver.process_timer( + timer_window, name, time_domain, fire_time, state): + yield wvalue.with_value((k, wvalue.value)) + + class Partition(PTransformWithSideInputs): """Split a PCollection into several partitions.
