This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bcff63f Adding context switch to operations missing it.
bcff63f is described below
commit bcff63fb378250d15ab96c3498f7351196e056a9
Author: Pablo <[email protected]>
AuthorDate: Thu May 10 16:42:50 2018 -0700
Adding context switch to operations missing it.
---
.../apache_beam/runners/worker/operations.py | 84 +++++++++++-----------
1 file changed, 44 insertions(+), 40 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index 1c425ae..f382837 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -439,11 +439,12 @@ class CombineOperation(Operation):
logging.debug('Finishing %s', self)
def process(self, o):
- if self.debug_logging_enabled:
- logging.debug('Processing [%s] in %s', o, self)
- key, values = o.value
- self.output(
- o.with_value((key, self.phased_combine_fn.apply(values))))
+ with self.scoped_process_state:
+ if self.debug_logging_enabled:
+ logging.debug('Processing [%s] in %s', o, self)
+ key, values = o.value
+ self.output(
+ o.with_value((key, self.phased_combine_fn.apply(values))))
def create_pgbk_op(step_name, spec, counter_factory, state_sampler):
@@ -471,12 +472,13 @@ class PGBKOperation(Operation):
self.max_size = 10 * 1000
def process(self, o):
- # TODO(robertwb): Structural (hashable) values.
- key = o.value[0], tuple(o.windows)
- self.table[key].append(o)
- self.size += 1
- if self.size > self.max_size:
- self.flush(9 * self.max_size // 10)
+ with self.scoped_process_state:
+ # TODO(robertwb): Structural (hashable) values.
+ key = o.value[0], tuple(o.windows)
+ self.table[key].append(o)
+ self.size += 1
+ if self.size > self.max_size:
+ self.flush(9 * self.max_size // 10)
def finish(self):
self.flush(0)
@@ -526,32 +528,33 @@ class PGBKCVOperation(Operation):
self.table = {}
def process(self, wkv):
- key, value = wkv.value
- # pylint: disable=unidiomatic-typecheck
- # Optimization for the global window case.
- if len(wkv.windows) == 1 and type(wkv.windows[0]) is _global_window_type:
- wkey = 0, key
- else:
- wkey = tuple(wkv.windows), key
- entry = self.table.get(wkey, None)
- if entry is None:
- if self.key_count >= self.max_keys:
- target = self.key_count * 9 // 10
- old_wkeys = []
- # TODO(robertwb): Use an LRU cache?
- for old_wkey, old_wvalue in self.table.iteritems():
- old_wkeys.append(old_wkey) # Can't mutate while iterating.
- self.output_key(old_wkey, old_wvalue[0])
- self.key_count -= 1
- if self.key_count <= target:
- break
- for old_wkey in reversed(old_wkeys):
- del self.table[old_wkey]
- self.key_count += 1
- # We save the accumulator as a one element list so we can efficiently
- # mutate when new values are added without searching the cache again.
- entry = self.table[wkey] = [self.combine_fn.create_accumulator()]
- entry[0] = self.combine_fn_add_input(entry[0], value)
+ with self.scoped_process_state:
+ key, value = wkv.value
+ # pylint: disable=unidiomatic-typecheck
+ # Optimization for the global window case.
+ if len(wkv.windows) == 1 and type(wkv.windows[0]) is _global_window_type:
+ wkey = 0, key
+ else:
+ wkey = tuple(wkv.windows), key
+ entry = self.table.get(wkey, None)
+ if entry is None:
+ if self.key_count >= self.max_keys:
+ target = self.key_count * 9 // 10
+ old_wkeys = []
+ # TODO(robertwb): Use an LRU cache?
+ for old_wkey, old_wvalue in self.table.iteritems():
+ old_wkeys.append(old_wkey) # Can't mutate while iterating.
+ self.output_key(old_wkey, old_wvalue[0])
+ self.key_count -= 1
+ if self.key_count <= target:
+ break
+ for old_wkey in reversed(old_wkeys):
+ del self.table[old_wkey]
+ self.key_count += 1
+ # We save the accumulator as a one element list so we can efficiently
+ # mutate when new values are added without searching the cache again.
+ entry = self.table[wkey] = [self.combine_fn.create_accumulator()]
+ entry[0] = self.combine_fn_add_input(entry[0], value)
def finish(self):
for wkey, value in self.table.iteritems():
@@ -575,9 +578,10 @@ class FlattenOperation(Operation):
"""
def process(self, o):
- if self.debug_logging_enabled:
- logging.debug('Processing [%s] in %s', o, self)
- self.output(o)
+ with self.scoped_process_state:
+ if self.debug_logging_enabled:
+ logging.debug('Processing [%s] in %s', o, self)
+ self.output(o)
def create_operation(name_context, spec, counter_factory, step_name,
--
To stop receiving notification emails like this one, please contact
[email protected].