damccorm commented on code in PR #37214:
URL: https://github.com/apache/beam/pull/37214#discussion_r2698850574


##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -46,12 +46,110 @@ export interface StateProvider {
 }
 
 // TODO: (Advanced) Cross-bundle caching.
+/**
+ * Wrapper for cached values that tracks their weight (memory size).
+ */
+interface WeightedCacheEntry<T> {
+  entry: MaybePromise<T>;
+  weight: number;
+}
+
+// Default weight for values that cannot be sized (e.g., promises)
+const DEFAULT_WEIGHT = 64;
+
+/**
+ * Estimates the memory size of a value in bytes.
+ * Handles circular references by tracking visited objects.
+ */
+function sizeof(value: any, visited: Set<any> = new Set()): number {
+  if (value === null || value === undefined) {
+    return 8;
+  }
+
+  // Handle circular references for objects
+  if (typeof value === "object") {
+    if (visited.has(value)) {
+      return 8; // Account for reference size, not the full object again
+    }
+    visited.add(value);
+  }
+
+  const type = typeof value;
+
+  if (type === "boolean") {
+    return 4;
+  }
+  if (type === "number") {
+    return 8;
+  }
+  if (type === "string") {
+    // Each character is 2 bytes in JavaScript (UTF-16) + overhead
+    return 40 + value.length * 2;
+  }
+  if (value instanceof Uint8Array || value instanceof Buffer) {
+    return 40 + value.length;
+  }
+  if (Array.isArray(value)) {
+    let size = 40; // Array overhead
+    for (const item of value) {
+      size += sizeof(item, visited);
+    }
+    return size;
+  }
+  if (type === "object") {
+    let size = 40; // Object overhead
+    for (const key of Object.keys(value)) {
+      size += sizeof(key, visited) + sizeof(value[key], visited);
+    }
+    return size;
+  }
+
+  // Default for unknown types
+  return DEFAULT_WEIGHT;
+}
+
+// Default cache size: 100MB
+const DEFAULT_MAX_CACHE_WEIGHT = 100 * 1024 * 1024;
+
 export class CachingStateProvider implements StateProvider {
   underlying: StateProvider;
-  cache: Map<string, MaybePromise<any>> = new Map();
+  cache: Map<string, WeightedCacheEntry<any>> = new Map();
+  maxCacheWeight: number;
+  currentWeight: number = 0;
 
-  constructor(underlying: StateProvider) {
+  constructor(
+    underlying: StateProvider,
+    maxCacheWeight: number = DEFAULT_MAX_CACHE_WEIGHT,
+  ) {
     this.underlying = underlying;
+    this.maxCacheWeight = maxCacheWeight;
+  }
+
+  /**
+   * Evicts least recently used entries until the cache is under the weight 
limit.
+   * JavaScript Maps preserve insertion order, so the first entry is the 
oldest.
+   */
+  private evictIfNeeded() {
+    while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
+      // Get the first (oldest) entry from the map iterator
+      const firstEntry = this.cache.entries().next().value;
+      const firstKey = firstEntry[0];
+      const evictedEntry = firstEntry[1];

Review Comment:
   This is fine as is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to