Implement windowed side inputs for InProcess runner.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7386bcca Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7386bcca Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7386bcca Branch: refs/heads/python-sdk Commit: 7386bcca10d1e7905ea65ba68a53e78b6185d876 Parents: 66b4c2f Author: Robert Bradshaw <rober...@google.com> Authored: Wed Oct 12 17:37:52 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Tue Oct 18 12:17:16 2016 -0700 ---------------------------------------------------------------------- .../inprocess/inprocess_evaluation_context.py | 20 +------------------- 1 file changed, 1 insertion(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7386bcca/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py index 883be99..c6bd41f 100644 --- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py +++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py @@ -106,25 +106,7 @@ class _InProcessSideInputsContainer(object): Raises: ValueError: If values cannot be converted into the requested form. """ - if isinstance(view, SingletonPCollectionView): - if len(values) == 0: - # pylint: disable=protected-access - result = view._view_options().get('default', EmptySideInput()) - elif len(values) == 1: - result = values[0].value - else: - raise ValueError( - ('PCollection with more than one element accessed as ' - 'a singleton view: %s.') % view) - elif isinstance(view, IterablePCollectionView): - result = [v.value for v in values] - elif isinstance(view, ListPCollectionView): - result = [v.value for v in values] - elif isinstance(view, DictPCollectionView): - result = dict(v.value for v in values) - else: - raise NotImplementedError - return result + return sideinputs.SideInputMap(type(view), view._view_options(), values) class InProcessEvaluationContext(object):