This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c6cf20b [BEAM-3719] Adds support for reading side-inputs from SDFs
c6cf20b is described below
commit c6cf20b6a1edd8cdd708c8511084b7806dbe80b2
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Thu Mar 1 13:39:20 2018 -0800
[BEAM-3719] Adds support for reading side-inputs from SDFs
---
sdks/python/apache_beam/runners/common.pxd | 3 +-
sdks/python/apache_beam/runners/common.py | 40 ++++++++++++++++------
.../runners/direct/sdf_direct_runner.py | 17 ++++++---
.../runners/direct/sdf_direct_runner_test.py | 29 +++++++++++-----
sdks/python/apache_beam/runners/sdf_common.py | 6 ++--
5 files changed, 70 insertions(+), 25 deletions(-)
diff --git a/sdks/python/apache_beam/runners/common.pxd
b/sdks/python/apache_beam/runners/common.pxd
index fed0c2c..9b0871f 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -51,7 +51,8 @@ cdef class DoFnInvoker(object):
cpdef invoke_process(self, WindowedValue windowed_value,
restriction_tracker=*,
- OutputProcessor output_processor=*)
+ OutputProcessor output_processor=*,
+ additional_args=*, additional_kwargs=*)
cpdef invoke_start_bundle(self)
cpdef invoke_finish_bundle(self)
cpdef invoke_split(self, element, restriction)
diff --git a/sdks/python/apache_beam/runners/common.py
b/sdks/python/apache_beam/runners/common.py
index 39e5e99..124d7d3 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -180,8 +180,14 @@ class DoFnInvoker(object):
signature: a DoFnSignature for the DoFn being invoked.
context: Context to be used when invoking the DoFn (deprecated).
side_inputs: side inputs to be used when invoking th process method.
- input_args: arguments to be used when invoking the process method
- input_kwargs: kwargs to be used when invoking the process method.
+ input_args: arguments to be used when invoking the process method. Some
+ of the arguments given here might be placeholders (for
+ example for side inputs) that get filled before invoking
the
+ process method.
+ input_kwargs: keyword arguments to be used when invoking the process
+ method. Some of the keyword arguments given here might be
+ placeholders (for example for side inputs) that get
filled
+ before invoking the process method.
process_invocation: If True, this function may return an invoker that
performs extra optimizations for invoking process()
method efficiently.
@@ -199,7 +205,8 @@ class DoFnInvoker(object):
signature, context, side_inputs, input_args, input_kwargs)
def invoke_process(self, windowed_value, restriction_tracker=None,
- output_processor=None):
+ output_processor=None,
+ additional_args=None, additional_kwargs=None):
"""Invokes the DoFn.process() function.
Args:
@@ -207,6 +214,10 @@ class DoFnInvoker(object):
process() method should be invoked along with the window
the element belongs to.
output_procesor: if provided given OutputProcessor will be used.
+ additional_args: additional arguments to be passed to the current
+ `DoFn.process()` invocation, usually as side inputs.
+ additional_kwargs: additional keyword arguments to be passed to the
+ current `DoFn.process()` invocation.
"""
raise NotImplementedError
@@ -265,7 +276,8 @@ class SimpleInvoker(DoFnInvoker):
self.process_method = signature.process_method.method_value
def invoke_process(self, windowed_value, restriction_tracker=None,
- output_processor=None):
+ output_processor=None,
+ additional_args=None, additional_kwargs=None):
if not output_processor:
output_processor = self.output_processor
output_processor.process_outputs(
@@ -351,7 +363,13 @@ class PerWindowInvoker(DoFnInvoker):
self.kwargs_for_process = input_kwargs
def invoke_process(self, windowed_value, restriction_tracker=None,
- output_processor=None):
+ output_processor=None,
+ additional_args=None, additional_kwargs=None):
+ if not additional_args:
+ additional_args = []
+ if not additional_kwargs:
+ additional_kwargs = {}
+
if not output_processor:
output_processor = self.output_processor
self.context.set_element(windowed_value)
@@ -359,7 +377,6 @@ class PerWindowInvoker(DoFnInvoker):
# or if the process accesses the window parameter. We can just call it once
# otherwise as none of the arguments are changing
- additional_kwargs = {}
if restriction_tracker:
restriction_tracker_param = _find_param_with_default(
self.signature.process_method,
@@ -373,18 +390,21 @@ class PerWindowInvoker(DoFnInvoker):
for w in windowed_value.windows:
self._invoke_per_window(
WindowedValue(windowed_value.value, windowed_value.timestamp,
(w,)),
- additional_kwargs, output_processor)
+ additional_args, additional_kwargs, output_processor)
else:
self._invoke_per_window(
- windowed_value, additional_kwargs, output_processor)
+ windowed_value, additional_args, additional_kwargs, output_processor)
def _invoke_per_window(
- self, windowed_value, additional_kwargs, output_processor):
+ self, windowed_value, additional_args,
+ additional_kwargs, output_processor):
if self.has_windowed_inputs:
window, = windowed_value.windows
+ side_inputs = [si[window] for si in self.side_inputs]
+ side_inputs.extend(additional_args)
args_for_process, kwargs_for_process = util.insert_values_in_args(
self.args_for_process, self.kwargs_for_process,
- [si[window] for si in self.side_inputs])
+ side_inputs)
else:
args_for_process, kwargs_for_process = (
self.args_for_process, self.kwargs_for_process)
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
index aa247aa..610664b 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py
@@ -59,8 +59,15 @@ class ProcessKeyedElementsViaKeyedWorkItems(PTransform):
self._process_keyed_elements_transform = process_keyed_elements_transform
def expand(self, pcoll):
- return pcoll | beam.core.GroupByKey() | ProcessElements(
+ process_elements = ProcessElements(
self._process_keyed_elements_transform)
+ process_elements.args = (
+ self._process_keyed_elements_transform.ptransform_args)
+ process_elements.kwargs = (
+ self._process_keyed_elements_transform.ptransform_kwargs)
+ process_elements.side_inputs = (
+ self._process_keyed_elements_transform.ptransform_side_inputs)
+ return pcoll | beam.core.GroupByKey() | process_elements
class ProcessElements(PTransform):
@@ -176,7 +183,7 @@ class ProcessFn(beam.DoFn):
SDFProcessElementInvoker)
output_values = self._process_element_invoker.invoke_process_element(
- self.sdf_invoker, windowed_element, tracker)
+ self.sdf_invoker, windowed_element, tracker, *args, **kwargs)
sdf_result = None
for output in output_values:
@@ -270,7 +277,8 @@ class SDFProcessElementInvoker(object):
def test_method(self):
raise ValueError
- def invoke_process_element(self, sdf_invoker, element, tracker):
+ def invoke_process_element(
+ self, sdf_invoker, element, tracker, *args, **kwargs):
"""Invokes `process()` method of a Splittable `DoFn` for a given element.
Args:
@@ -302,7 +310,8 @@ class SDFProcessElementInvoker(object):
output_processor = _OutputProcessor()
Timer(self._max_duration, initiate_checkpoint).start()
sdf_invoker.invoke_process(
- element, restriction_tracker=tracker,
output_processor=output_processor)
+ element, restriction_tracker=tracker,
output_processor=output_processor,
+ additional_args=args, additional_kwargs=kwargs)
assert output_processor.output_iter is not None
output_count = 0
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
index 7ab6dde..e8ef9b6 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
@@ -22,9 +22,12 @@ import os
import unittest
import apache_beam as beam
+from apache_beam import Create
from apache_beam import DoFn
from apache_beam.io import filebasedsource_test
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
+from apache_beam.pvalue import AsList
+from apache_beam.pvalue import AsSingleton
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
@@ -100,10 +103,13 @@ class ExpandStrings(DoFn):
self._record_window = record_window
def process(
- self, element, window=beam.DoFn.WindowParam,
- restriction_tracker=ExpandStringsProvider(), side=None,
+ self, element, side1, side2, side3, window=beam.DoFn.WindowParam,
+ restriction_tracker=ExpandStringsProvider(),
*args, **kwargs):
- side = side or []
+ side = []
+ side.extend(side1)
+ side.extend(side2)
+ side.extend(side3)
assert isinstance(restriction_tracker, OffsetRestrictionTracker)
side = list(side)
for i in range(restriction_tracker.start_position(),
@@ -212,7 +218,7 @@ class SDFDirectRunnerTest(unittest.TestCase):
TimestampedValue(('B', t), t)])
| beam.WindowInto(SlidingWindows(10, 5),
accumulation_mode=AccumulationMode.DISCARDING)
- | beam.ParDo(ExpandStrings(record_window=True)))
+ | beam.ParDo(ExpandStrings(record_window=True), [], [], []))
expected_result = [
'A:1:-5', 'A:1:0', 'A:3:-5', 'A:3:0', 'A:5:0', 'A:5:5', 'A:10:5',
@@ -222,11 +228,18 @@ class SDFDirectRunnerTest(unittest.TestCase):
def test_sdf_with_side_inputs(self):
with TestPipeline() as p:
+ side1 = p | 'Create1' >> Create(['1', '2'])
+ side2 = p | 'Create2' >> Create(['3', '4'])
+ side3 = p | 'Create3' >> Create(['5'])
result = (p
- | 'create_main' >> beam.Create(['1', '3', '5'])
- | beam.ParDo(ExpandStrings(), side=['1', '3']))
-
- expected_result = ['1:1', '3:1', '5:1', '1:3', '3:3', '5:3']
+ | 'create_main' >> beam.Create(['a', 'b', 'c'])
+ | beam.ParDo(ExpandStrings(), AsList(side1), AsList(side2),
+ AsSingleton(side3)))
+
+ expected_result = []
+ for c in ['a', 'b', 'c']:
+ for i in range(5):
+ expected_result.append(c + ':' + str(i+1))
assert_that(result, equal_to(expected_result))
diff --git a/sdks/python/apache_beam/runners/sdf_common.py
b/sdks/python/apache_beam/runners/sdf_common.py
index a3e1418..5b35544 100644
--- a/sdks/python/apache_beam/runners/sdf_common.py
+++ b/sdks/python/apache_beam/runners/sdf_common.py
@@ -77,7 +77,8 @@ class SplittableParDo(PTransform):
return keyed_elements | ProcessKeyedElements(
sdf, element_coder, restriction_coder,
- pcoll.windowing, self._ptransform.args, self._ptransform.kwargs)
+ pcoll.windowing, self._ptransform.args, self._ptransform.kwargs,
+ self._ptransform.side_inputs)
class ElementAndRestriction(object):
@@ -153,13 +154,14 @@ class ProcessKeyedElements(PTransform):
def __init__(
self, sdf, element_coder, restriction_coder, windowing_strategy,
- ptransform_args, ptransform_kwargs):
+ ptransform_args, ptransform_kwargs, ptransform_side_inputs):
self.sdf = sdf
self.element_coder = element_coder
self.restriction_coder = restriction_coder
self.windowing_strategy = windowing_strategy
self.ptransform_args = ptransform_args
self.ptransform_kwargs = ptransform_kwargs
+ self.ptransform_side_inputs = ptransform_side_inputs
def expand(self, pcoll):
return pvalue.PCollection(pcoll.pipeline)
--
To stop receiving notification emails like this one, please contact
[email protected].