This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bcff63f  Adding context switch to operations missing it.
bcff63f is described below

commit bcff63fb378250d15ab96c3498f7351196e056a9
Author: Pablo <[email protected]>
AuthorDate: Thu May 10 16:42:50 2018 -0700

    Adding context switch to operations missing it.
---
 .../apache_beam/runners/worker/operations.py       | 84 +++++++++++-----------
 1 file changed, 44 insertions(+), 40 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 1c425ae..f382837 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -439,11 +439,12 @@ class CombineOperation(Operation):
     logging.debug('Finishing %s', self)
 
   def process(self, o):
-    if self.debug_logging_enabled:
-      logging.debug('Processing [%s] in %s', o, self)
-    key, values = o.value
-    self.output(
-        o.with_value((key, self.phased_combine_fn.apply(values))))
+    with self.scoped_process_state:
+      if self.debug_logging_enabled:
+        logging.debug('Processing [%s] in %s', o, self)
+      key, values = o.value
+      self.output(
+          o.with_value((key, self.phased_combine_fn.apply(values))))
 
 
 def create_pgbk_op(step_name, spec, counter_factory, state_sampler):
@@ -471,12 +472,13 @@ class PGBKOperation(Operation):
     self.max_size = 10 * 1000
 
   def process(self, o):
-    # TODO(robertwb): Structural (hashable) values.
-    key = o.value[0], tuple(o.windows)
-    self.table[key].append(o)
-    self.size += 1
-    if self.size > self.max_size:
-      self.flush(9 * self.max_size // 10)
+    with self.scoped_process_state:
+      # TODO(robertwb): Structural (hashable) values.
+      key = o.value[0], tuple(o.windows)
+      self.table[key].append(o)
+      self.size += 1
+      if self.size > self.max_size:
+        self.flush(9 * self.max_size // 10)
 
   def finish(self):
     self.flush(0)
@@ -526,32 +528,33 @@ class PGBKCVOperation(Operation):
     self.table = {}
 
   def process(self, wkv):
-    key, value = wkv.value
-    # pylint: disable=unidiomatic-typecheck
-    # Optimization for the global window case.
-    if len(wkv.windows) == 1 and type(wkv.windows[0]) is _global_window_type:
-      wkey = 0, key
-    else:
-      wkey = tuple(wkv.windows), key
-    entry = self.table.get(wkey, None)
-    if entry is None:
-      if self.key_count >= self.max_keys:
-        target = self.key_count * 9 // 10
-        old_wkeys = []
-        # TODO(robertwb): Use an LRU cache?
-        for old_wkey, old_wvalue in self.table.iteritems():
-          old_wkeys.append(old_wkey)  # Can't mutate while iterating.
-          self.output_key(old_wkey, old_wvalue[0])
-          self.key_count -= 1
-          if self.key_count <= target:
-            break
-        for old_wkey in reversed(old_wkeys):
-          del self.table[old_wkey]
-      self.key_count += 1
-      # We save the accumulator as a one element list so we can efficiently
-      # mutate when new values are added without searching the cache again.
-      entry = self.table[wkey] = [self.combine_fn.create_accumulator()]
-    entry[0] = self.combine_fn_add_input(entry[0], value)
+    with self.scoped_process_state:
+      key, value = wkv.value
+      # pylint: disable=unidiomatic-typecheck
+      # Optimization for the global window case.
+      if len(wkv.windows) == 1 and type(wkv.windows[0]) is _global_window_type:
+        wkey = 0, key
+      else:
+        wkey = tuple(wkv.windows), key
+      entry = self.table.get(wkey, None)
+      if entry is None:
+        if self.key_count >= self.max_keys:
+          target = self.key_count * 9 // 10
+          old_wkeys = []
+          # TODO(robertwb): Use an LRU cache?
+          for old_wkey, old_wvalue in self.table.iteritems():
+            old_wkeys.append(old_wkey)  # Can't mutate while iterating.
+            self.output_key(old_wkey, old_wvalue[0])
+            self.key_count -= 1
+            if self.key_count <= target:
+              break
+          for old_wkey in reversed(old_wkeys):
+            del self.table[old_wkey]
+        self.key_count += 1
+        # We save the accumulator as a one element list so we can efficiently
+        # mutate when new values are added without searching the cache again.
+        entry = self.table[wkey] = [self.combine_fn.create_accumulator()]
+      entry[0] = self.combine_fn_add_input(entry[0], value)
 
   def finish(self):
     for wkey, value in self.table.iteritems():
@@ -575,9 +578,10 @@ class FlattenOperation(Operation):
   """
 
   def process(self, o):
-    if self.debug_logging_enabled:
-      logging.debug('Processing [%s] in %s', o, self)
-    self.output(o)
+    with self.scoped_process_state:
+      if self.debug_logging_enabled:
+        logging.debug('Processing [%s] in %s', o, self)
+      self.output(o)
 
 
 def create_operation(name_context, spec, counter_factory, step_name,

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to