damccorm commented on code in PR #37466:
URL: https://github.com/apache/beam/pull/37466#discussion_r2756100027


##########
sdks/typescript/src/apache_beam/worker/operators.ts:
##########
@@ -495,6 +509,45 @@ export class CombinePerKeyPrecombineOperator<I, A, O>
     this.groups = new Map();
   }
 
+  /**
+   * Flushes entries from the cache using LRU eviction.
+   * Evicts the least recently used entries (from the front of the Map)
+   * until the cache size is at or below the target.
+   */
+  flushLRU(target: number): ProcessResult {
+    const result = new ProcessResultBuilder();
+    const toDelete: string[] = [];
+    // Iterate from the front (oldest/least recently used entries)
+    for (const [wkey, values] of this.groups) {
+      if (this.groups.size - toDelete.length <= target) {
+        break;
+      }
+      const parts = wkey.split(" ");
+      const encodedWindow = parts[0];
+      const encodedKey = parts[1];
+      const window = decodeFromBase64(encodedWindow, this.windowCoder);
+      result.add(
+        this.receiver.receive({
+          value: {
+            key: decodeFromBase64(encodedKey, this.keyCoder),
+            value: values,
+          },
+          windows: [window],
+          timestamp: window.maxTimestamp(),
+          pane: PaneInfoCoder.ONE_AND_ONLY_FIRING,
+        }),
+      );
+      toDelete.push(wkey);
+    }
+    for (const wkey of toDelete) {
+      this.groups.delete(wkey);
+    }
+    return result.build();
+  }

Review Comment:
   Do we even need to keep the existing flush implementation? We could probably 
replace it with a `clear` operation that is a simpler equivalent of `flush(0)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to