Optimize globally windowed side input case

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/82f553e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/82f553e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/82f553e1

Branch: refs/heads/python-sdk
Commit: 82f553e1a777a8f88ca163e07d631d3df3e1f321
Parents: 989a189
Author: Robert Bradshaw <rober...@google.com>
Authored: Thu Oct 13 18:17:30 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Tue Oct 18 12:17:16 2016 -0700

----------------------------------------------------------------------
 .../examples/cookbook/bigquery_side_input.py        |  1 -
 sdks/python/apache_beam/pvalue.py                   |  4 ++--
 sdks/python/apache_beam/runners/common.py           | 16 ++++++++++++----
 sdks/python/apache_beam/runners/direct_runner.py    |  5 -----
 .../inprocess/inprocess_evaluation_context.py       |  5 -----
 sdks/python/apache_beam/transforms/sideinputs.py    |  9 ++++++---
 .../apache_beam/transforms/sideinputs_test.py       |  2 +-
 7 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py 
b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index ffba786..8a53637 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -33,7 +33,6 @@ from random import randrange
 
 import apache_beam as beam
 
-from apache_beam.pvalue import AsIter
 from apache_beam.pvalue import AsList
 from apache_beam.pvalue import AsSingleton
 from apache_beam.utils.options import PipelineOptions

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/pvalue.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index f8b7feb..e40d746 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -276,8 +276,8 @@ class SingletonPCollectionView(PCollectionView):
       return head[0]
     else:
       raise ValueError(
-        'PCollection with more than one element accessed as '
-        'a singleton view.')
+          'PCollection with more than one element accessed as '
+          'a singleton view.')
 
 
 class IterablePCollectionView(PCollectionView):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/runners/common.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index cd06879..e500fd8 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -76,17 +76,25 @@ class DoFnRunner(Receiver):
       self.dofn = fn
       self.dofn_process = fn.process
     else:
+      global_window = window.GlobalWindow()
       # TODO(robertwb): Remove when all runners pass side input maps.
-      # TODO(robertwb): Optimize for global windows case.
-      side_inputs = [side_input if isinstance(side_input, 
sideinputs.SideInputMap)
-                     else {window.GlobalWindow(): side_input}
+      side_inputs = [side_input
+                     if isinstance(side_input, sideinputs.SideInputMap)
+                     else {global_window: side_input}
                      for side_input in side_inputs]
+      if side_inputs and all(side_input.is_globally_windowed()
+                             for side_input in side_inputs):
+        args, kwargs = util.insert_values_in_args(
+            args, kwargs, [side_input[global_window]
+                           for side_input in side_inputs])
+        side_inputs = []
       if side_inputs:
         self.has_windowed_side_inputs = True
+
         def process(context):
           w = context.windows[0]
           cur_args, cur_kwargs = util.insert_values_in_args(
-            args, kwargs, [side_input[w] for side_input in side_inputs])
+              args, kwargs, [side_input[w] for side_input in side_inputs])
           return fn.process(context, *cur_args, **cur_kwargs)
         self.dofn_process = process
       elif kwargs:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/runners/direct_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct_runner.py 
b/sdks/python/apache_beam/runners/direct_runner.py
index 936043c..c4c52b3 100644
--- a/sdks/python/apache_beam/runners/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct_runner.py
@@ -30,11 +30,6 @@ import logging
 
 from apache_beam import coders
 from apache_beam import error
-from apache_beam.pvalue import DictPCollectionView
-from apache_beam.pvalue import EmptySideInput
-from apache_beam.pvalue import IterablePCollectionView
-from apache_beam.pvalue import ListPCollectionView
-from apache_beam.pvalue import SingletonPCollectionView
 from apache_beam.runners.common import DoFnRunner
 from apache_beam.runners.common import DoFnState
 from apache_beam.runners.runner import PipelineResult

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
----------------------------------------------------------------------
diff --git 
a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py 
b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
index c6bd41f..7af1608 100644
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_evaluation_context.py
@@ -22,11 +22,6 @@ from __future__ import absolute_import
 import collections
 import threading
 
-from apache_beam.pvalue import DictPCollectionView
-from apache_beam.pvalue import EmptySideInput
-from apache_beam.pvalue import IterablePCollectionView
-from apache_beam.pvalue import ListPCollectionView
-from apache_beam.pvalue import SingletonPCollectionView
 from apache_beam.transforms import sideinputs
 from apache_beam.runners.inprocess.clock import Clock
 from apache_beam.runners.inprocess.inprocess_watermark_manager import 
InProcessWatermarkManager

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/transforms/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs.py 
b/sdks/python/apache_beam/transforms/sideinputs.py
index 2079c93..182179a 100644
--- a/sdks/python/apache_beam/transforms/sideinputs.py
+++ b/sdks/python/apache_beam/transforms/sideinputs.py
@@ -170,7 +170,7 @@ def default_window_mapping_fn(target_window_fn):
   else:
     def map_via_end(source_window):
       return list(target_window_fn.assign(
-        window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
+          window.WindowFn.AssignContext(source_window.max_timestamp())))[-1]
     return map_via_end
 
 
@@ -188,14 +188,17 @@ class SideInputMap(object):
     if window not in self._cache:
       target_window = self._window_mapping_fn(window)
       self._cache[window] = self._view_class.from_iterable(
-        _FilteringIterable(self._iterable, target_window), self._view_options)
+          _FilteringIterable(self._iterable, target_window), 
self._view_options)
     return self._cache[window]
 
+  def is_globally_windowed(self):
+    return self._window_mapping_fn == _global_window_mapping_fn
+
 
 class _FilteringIterable(object):
   """An iterable containing only those values in the given window.
   """
-  
+
   def __init__(self, iterable, target_window):
     self._iterable = iterable
     self._target_window = target_window

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/82f553e1/sdks/python/apache_beam/transforms/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py 
b/sdks/python/apache_beam/transforms/sideinputs_test.py
index f84ff57..28e324a 100644
--- a/sdks/python/apache_beam/transforms/sideinputs_test.py
+++ b/sdks/python/apache_beam/transforms/sideinputs_test.py
@@ -101,7 +101,7 @@ class SideInputsTest(unittest.TestCase):
         [1, 2, 11],
         window.FixedWindows(10),
         side_input_type=beam.pvalue.AsIter,
-        expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])    
+        expected=[(1, [1, 2]), (2, [1, 2]), (11, [11])])
 
   def test_windowed_singleton(self):
     self.run_windowed_side_inputs(

Reply via email to