[
https://issues.apache.org/jira/browse/BEAM-4273?focusedWorklogId=101237&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101237
]
ASF GitHub Bot logged work on BEAM-4273:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/May/18 17:57
Start Date: 11/May/18 17:57
Worklog Time Spent: 10m
Work Description: pabloem closed pull request #5333: [BEAM-4273] Adding
context switch to operations missing it.
URL: https://github.com/apache/beam/pull/5333
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index 1c425aec2fd..f38283741b5 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -439,11 +439,12 @@ def finish(self):
logging.debug('Finishing %s', self)
def process(self, o):
- if self.debug_logging_enabled:
- logging.debug('Processing [%s] in %s', o, self)
- key, values = o.value
- self.output(
- o.with_value((key, self.phased_combine_fn.apply(values))))
+ with self.scoped_process_state:
+ if self.debug_logging_enabled:
+ logging.debug('Processing [%s] in %s', o, self)
+ key, values = o.value
+ self.output(
+ o.with_value((key, self.phased_combine_fn.apply(values))))
def create_pgbk_op(step_name, spec, counter_factory, state_sampler):
@@ -471,12 +472,13 @@ def __init__(self, name_context, spec, counter_factory,
state_sampler):
self.max_size = 10 * 1000
def process(self, o):
- # TODO(robertwb): Structural (hashable) values.
- key = o.value[0], tuple(o.windows)
- self.table[key].append(o)
- self.size += 1
- if self.size > self.max_size:
- self.flush(9 * self.max_size // 10)
+ with self.scoped_process_state:
+ # TODO(robertwb): Structural (hashable) values.
+ key = o.value[0], tuple(o.windows)
+ self.table[key].append(o)
+ self.size += 1
+ if self.size > self.max_size:
+ self.flush(9 * self.max_size // 10)
def finish(self):
self.flush(0)
@@ -526,32 +528,33 @@ def __init__(self, name_context, spec, counter_factory,
state_sampler):
self.table = {}
def process(self, wkv):
- key, value = wkv.value
- # pylint: disable=unidiomatic-typecheck
- # Optimization for the global window case.
- if len(wkv.windows) == 1 and type(wkv.windows[0]) is _global_window_type:
- wkey = 0, key
- else:
- wkey = tuple(wkv.windows), key
- entry = self.table.get(wkey, None)
- if entry is None:
- if self.key_count >= self.max_keys:
- target = self.key_count * 9 // 10
- old_wkeys = []
- # TODO(robertwb): Use an LRU cache?
- for old_wkey, old_wvalue in self.table.iteritems():
- old_wkeys.append(old_wkey) # Can't mutate while iterating.
- self.output_key(old_wkey, old_wvalue[0])
- self.key_count -= 1
- if self.key_count <= target:
- break
- for old_wkey in reversed(old_wkeys):
- del self.table[old_wkey]
- self.key_count += 1
- # We save the accumulator as a one element list so we can efficiently
- # mutate when new values are added without searching the cache again.
- entry = self.table[wkey] = [self.combine_fn.create_accumulator()]
- entry[0] = self.combine_fn_add_input(entry[0], value)
+ with self.scoped_process_state:
+ key, value = wkv.value
+ # pylint: disable=unidiomatic-typecheck
+ # Optimization for the global window case.
+ if len(wkv.windows) == 1 and type(wkv.windows[0]) is _global_window_type:
+ wkey = 0, key
+ else:
+ wkey = tuple(wkv.windows), key
+ entry = self.table.get(wkey, None)
+ if entry is None:
+ if self.key_count >= self.max_keys:
+ target = self.key_count * 9 // 10
+ old_wkeys = []
+ # TODO(robertwb): Use an LRU cache?
+ for old_wkey, old_wvalue in self.table.iteritems():
+ old_wkeys.append(old_wkey) # Can't mutate while iterating.
+ self.output_key(old_wkey, old_wvalue[0])
+ self.key_count -= 1
+ if self.key_count <= target:
+ break
+ for old_wkey in reversed(old_wkeys):
+ del self.table[old_wkey]
+ self.key_count += 1
+ # We save the accumulator as a one element list so we can efficiently
+ # mutate when new values are added without searching the cache again.
+ entry = self.table[wkey] = [self.combine_fn.create_accumulator()]
+ entry[0] = self.combine_fn_add_input(entry[0], value)
def finish(self):
for wkey, value in self.table.iteritems():
@@ -575,9 +578,10 @@ class FlattenOperation(Operation):
"""
def process(self, o):
- if self.debug_logging_enabled:
- logging.debug('Processing [%s] in %s', o, self)
- self.output(o)
+ with self.scoped_process_state:
+ if self.debug_logging_enabled:
+ logging.debug('Processing [%s] in %s', o, self)
+ self.output(o)
def create_operation(name_context, spec, counter_factory, step_name,
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 101237)
Time Spent: 50m (was: 40m)
> Combine operations and some others missing context switch
> ---------------------------------------------------------
>
> Key: BEAM-4273
> URL: https://issues.apache.org/jira/browse/BEAM-4273
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.4.0
> Reporter: Pablo Estrada
> Assignee: Pablo Estrada
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)