robertwb commented on code in PR #34193:
URL: https://github.com/apache/beam/pull/34193#discussion_r1985471758


##########
sdks/python/apache_beam/runners/worker/operations.py:
##########
@@ -1280,33 +1280,37 @@ def process(self, wkv):
       # pylint: disable=unidiomatic-typecheck
       # Optimization for the global window case.
       if self.is_default_windowing:
-        wkey = key  # type: Hashable
+        self.add_key_value(key, value, None)
       else:
-        wkey = tuple(wkv.windows), key
-      entry = self.table.get(wkey, None)
-      if entry is None:
-        if self.key_count >= self.max_keys:
-          target = self.key_count * 9 // 10
-          old_wkeys = []
-          # TODO(robertwb): Use an LRU cache?
-          for old_wkey, old_wvalue in self.table.items():
-            old_wkeys.append(old_wkey)  # Can't mutate while iterating.
-            self.output_key(old_wkey, old_wvalue[0], old_wvalue[1])
-            self.key_count -= 1
-            if self.key_count <= target:
-              break
-          for old_wkey in reversed(old_wkeys):
-            del self.table[old_wkey]
-        self.key_count += 1
-        # We save the accumulator as a one element list so we can efficiently
-        # mutate when new values are added without searching the cache again.
-        entry = self.table[wkey] = [self.combine_fn.create_accumulator(), None]
-        if not self.is_default_windowing:
-          # Conditional as the timestamp attribute is lazily initialized.
-          entry[1] = wkv.timestamp
-      entry[0] = self.combine_fn_add_input(entry[0], value)
-      if not self.is_default_windowing and self.timestamp_combiner:
-        entry[1] = self.timestamp_combiner.combine(entry[1], wkv.timestamp)
+        for window in wkv.windows:
+          self.add_key_value((window, key),
+                             value,
+                             wkv.timestamp if self.timestamp_combiner else 
None)
+
+  def add_key_value(self, wkey, value, timestamp):
+    entry = self.table.get(wkey, None)
+    if entry is None:
+      if self.key_count >= self.max_keys:
+        target = self.key_count * 9 // 10
+        old_wkeys = []
+        # TODO(robertwb): Use an LRU cache?
+        for old_wkey, old_wvalue in self.table.items():
+          old_wkeys.append(old_wkey)  # Can't mutate while iterating.
+          self.output_key(old_wkey, old_wvalue[0], old_wvalue[1])
+          self.key_count -= 1
+          if self.key_count <= target:
+            break
+        for old_wkey in reversed(old_wkeys):
+          del self.table[old_wkey]
+      self.key_count += 1
+      # We save the accumulator as a one element list so we can efficiently
+      # mutate when new values are added without searching the cache again.
+      entry = self.table[wkey] = [
+          self.combine_fn.create_accumulator(), timestamp
+      ]
+    entry[0] = self.combine_fn_add_input(entry[0], value)
+    if not self.is_default_windowing and self.timestamp_combiner:
+      entry[1] = self.timestamp_combiner.combine(entry[1], timestamp)

Review Comment:
   I am also adding a workaround in the dataflow runner am adding a (local 
dataflow runner) test there as this is really about the interaction between the 
dataflow optimization and the SDK (though the SDK was technically being 
incorrect here). If we weren't able to fix this on the dataflow side too I'd 
add a changes notice, but I don't know that that's merited here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to