robertwb commented on code in PR #34193:
URL: https://github.com/apache/beam/pull/34193#discussion_r1985528265


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -1280,33 +1280,37 @@ def process(self, wkv):
       # pylint: disable=unidiomatic-typecheck
       # Optimization for the global window case.
       if self.is_default_windowing:
-        wkey = key  # type: Hashable
+        self.add_key_value(key, value, None)
       else:
-        wkey = tuple(wkv.windows), key
-      entry = self.table.get(wkey, None)
-      if entry is None:
-        if self.key_count >= self.max_keys:
-          target = self.key_count * 9 // 10
-          old_wkeys = []
-          # TODO(robertwb): Use an LRU cache?
-          for old_wkey, old_wvalue in self.table.items():
-            old_wkeys.append(old_wkey)  # Can't mutate while iterating.
-            self.output_key(old_wkey, old_wvalue[0], old_wvalue[1])
-            self.key_count -= 1
-            if self.key_count <= target:
-              break
-          for old_wkey in reversed(old_wkeys):
-            del self.table[old_wkey]
-        self.key_count += 1
-        # We save the accumulator as a one element list so we can efficiently
-        # mutate when new values are added without searching the cache again.
-        entry = self.table[wkey] = [self.combine_fn.create_accumulator(), None]
-        if not self.is_default_windowing:
-          # Conditional as the timestamp attribute is lazily initialized.
-          entry[1] = wkv.timestamp
-      entry[0] = self.combine_fn_add_input(entry[0], value)
-      if not self.is_default_windowing and self.timestamp_combiner:
-        entry[1] = self.timestamp_combiner.combine(entry[1], wkv.timestamp)
+        for window in wkv.windows:
+          self.add_key_value((window, key),
+                             value,
+                             wkv.timestamp if self.timestamp_combiner else 
None)
+
+  def add_key_value(self, wkey, value, timestamp):
+    entry = self.table.get(wkey, None)
+    if entry is None:
+      if self.key_count >= self.max_keys:
+        target = self.key_count * 9 // 10
+        old_wkeys = []
+        # TODO(robertwb): Use an LRU cache?
+        for old_wkey, old_wvalue in self.table.items():
+          old_wkeys.append(old_wkey)  # Can't mutate while iterating.
+          self.output_key(old_wkey, old_wvalue[0], old_wvalue[1])
+          self.key_count -= 1
+          if self.key_count <= target:
+            break
+        for old_wkey in reversed(old_wkeys):
+          del self.table[old_wkey]
+      self.key_count += 1
+      # We save the accumulator as a one element list so we can efficiently
+      # mutate when new values are added without searching the cache again.
+      entry = self.table[wkey] = [
+          self.combine_fn.create_accumulator(), timestamp
+      ]
+    entry[0] = self.combine_fn_add_input(entry[0], value)
+    if not self.is_default_windowing and self.timestamp_combiner:
+      entry[1] = self.timestamp_combiner.combine(entry[1], timestamp)

Review Comment:
   (I went ahead and added a test, but the direct runner doesn't expose this 
bug despite exercising this code.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to