liferoad commented on code in PR #34193:
URL: https://github.com/apache/beam/pull/34193#discussion_r1985615734
##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -1280,33 +1280,37 @@ def process(self, wkv):
# pylint: disable=unidiomatic-typecheck
# Optimization for the global window case.
if self.is_default_windowing:
- wkey = key # type: Hashable
+ self.add_key_value(key, value, None)
else:
- wkey = tuple(wkv.windows), key
- entry = self.table.get(wkey, None)
- if entry is None:
- if self.key_count >= self.max_keys:
- target = self.key_count * 9 // 10
- old_wkeys = []
- # TODO(robertwb): Use an LRU cache?
- for old_wkey, old_wvalue in self.table.items():
- old_wkeys.append(old_wkey) # Can't mutate while iterating.
- self.output_key(old_wkey, old_wvalue[0], old_wvalue[1])
- self.key_count -= 1
- if self.key_count <= target:
- break
- for old_wkey in reversed(old_wkeys):
- del self.table[old_wkey]
- self.key_count += 1
- # We save the accumulator as a one element list so we can efficiently
- # mutate when new values are added without searching the cache again.
- entry = self.table[wkey] = [self.combine_fn.create_accumulator(), None]
- if not self.is_default_windowing:
- # Conditional as the timestamp attribute is lazily initialized.
- entry[1] = wkv.timestamp
- entry[0] = self.combine_fn_add_input(entry[0], value)
- if not self.is_default_windowing and self.timestamp_combiner:
- entry[1] = self.timestamp_combiner.combine(entry[1], wkv.timestamp)
+ for window in wkv.windows:
+ self.add_key_value((window, key),
+ value,
+ wkv.timestamp if self.timestamp_combiner else
None)
+
+ def add_key_value(self, wkey, value, timestamp):
+ entry = self.table.get(wkey, None)
+ if entry is None:
+ if self.key_count >= self.max_keys:
+ target = self.key_count * 9 // 10
+ old_wkeys = []
+ # TODO(robertwb): Use an LRU cache?
+ for old_wkey, old_wvalue in self.table.items():
+ old_wkeys.append(old_wkey) # Can't mutate while iterating.
+ self.output_key(old_wkey, old_wvalue[0], old_wvalue[1])
+ self.key_count -= 1
+ if self.key_count <= target:
+ break
+ for old_wkey in reversed(old_wkeys):
+ del self.table[old_wkey]
+ self.key_count += 1
+ # We save the accumulator as a one element list so we can efficiently
+ # mutate when new values are added without searching the cache again.
+ entry = self.table[wkey] = [
+ self.combine_fn.create_accumulator(), timestamp
+ ]
+ entry[0] = self.combine_fn_add_input(entry[0], value)
+ if not self.is_default_windowing and self.timestamp_combiner:
+ entry[1] = self.timestamp_combiner.combine(entry[1], timestamp)
Review Comment:
Thanks. It is still good to cover this testing case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]