gemini-code-assist[bot] commented on code in PR #37214:
URL: https://github.com/apache/beam/pull/37214#discussion_r2692078774


##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -62,21 +164,41 @@ export class CachingStateProvider implements StateProvider 
{
       "base64",
     );
     if (this.cache.has(cacheKey)) {
-      return this.cache.get(cacheKey)!;
+      // Cache hit: move to end (most recently used)
+      this.touchCacheEntry(cacheKey);
+      return this.cache.get(cacheKey)!.entry;
     }
+    // Cache miss: fetch from underlying provider
     let result = this.underlying.getState(stateKey, decode);
-    const this_ = this;
     if (result.type === "promise") {
       result = {
         type: "promise",
         promise: result.promise.then((value) => {
-          this_.cache.set(cacheKey, { type: "value", value });
+          // When promise resolves, update cache with resolved value
+          // Get the current entry to update its weight
+          const currentEntry = this.cache.get(cacheKey);
+          if (currentEntry !== undefined) {
+            // Remove old weight from total
+            this.currentWeight -= currentEntry.weight;
+          }
+          const resolvedWeight = sizeof(value);
+          this.cache.set(cacheKey, {
+            entry: { type: "value", value },
+            weight: resolvedWeight,
+          });
+          this.currentWeight += resolvedWeight;
+          this.evictIfNeeded();

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   There's a potential race condition in how promise-based cache entries are 
updated. If a promise-based entry is evicted and then another request for the 
same key is cached before the original promise resolves, the `then` callback of 
the original promise will incorrectly update the cache's state. It might 
subtract the weight of the *new* entry and then add the weight of its own 
resolved value, leading to a corrupted `currentWeight`.
   
   To fix this, you should ensure that the cache entry being updated is the 
same one that was stored for the promise that is resolving. You can do this by 
comparing it with the one found in the cache before performing the update.
   
   ```typescript
             const currentEntry = this.cache.get(cacheKey);
             // Only update if the entry in the cache is still the promise we 
are resolving.
             // This prevents a race condition where the entry is evicted and 
replaced
             // before this promise resolves.
             if (currentEntry?.entry === result) {
               // Remove old weight (of the promise) from total.
               this.currentWeight -= currentEntry.weight;
   
               const resolvedWeight = sizeof(value);
               this.cache.set(cacheKey, {
                 entry: { type: "value", value },
                 weight: resolvedWeight,
               });
               this.currentWeight += resolvedWeight;
               this.evictIfNeeded();
             }
   ```



##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -46,12 +46,114 @@ export interface StateProvider {
 }
 
 // TODO: (Advanced) Cross-bundle caching.
+/**
+ * Wrapper for cached values that tracks their weight (memory size).
+ */
+interface WeightedCacheEntry<T> {
+  entry: MaybePromise<T>;
+  weight: number;
+}
+
+// Default weight for values that cannot be sized (e.g., promises)
+const DEFAULT_WEIGHT = 64;
+
+/**
+ * Estimates the memory size of a value in bytes.
+ * Handles circular references by tracking visited objects.
+ */
+function sizeof(value: any, visited: Set<any> = new Set()): number {
+  if (value === null || value === undefined) {
+    return 8;
+  }
+
+  // Handle circular references for objects
+  if (typeof value === "object") {
+    if (visited.has(value)) {
+      return 8; // Account for reference size, not the full object again
+    }
+    visited.add(value);
+  }
+
+  const type = typeof value;
+
+  if (type === "boolean") {
+    return 4;
+  }
+  if (type === "number") {
+    return 8;
+  }
+  if (type === "string") {
+    // Each character is 2 bytes in JavaScript (UTF-16) + overhead
+    return 40 + value.length * 2;
+  }
+  if (value instanceof Uint8Array || value instanceof Buffer) {
+    return 40 + value.length;
+  }
+  if (Array.isArray(value)) {
+    let size = 40; // Array overhead
+    for (const item of value) {
+      size += sizeof(item, visited);
+    }
+    return size;
+  }
+  if (type === "object") {
+    let size = 40; // Object overhead
+    for (const key of Object.keys(value)) {
+      size += sizeof(key, visited) + sizeof(value[key], visited);
+    }
+    return size;
+  }
+
+  // Default for unknown types
+  return DEFAULT_WEIGHT;
+}
+
+// Default cache size: 100MB
+const DEFAULT_MAX_CACHE_WEIGHT = 100 * 1024 * 1024;
+
 export class CachingStateProvider implements StateProvider {
   underlying: StateProvider;
-  cache: Map<string, MaybePromise<any>> = new Map();
+  cache: Map<string, WeightedCacheEntry<any>> = new Map();
+  maxCacheWeight: number;
+  currentWeight: number = 0;
 
-  constructor(underlying: StateProvider) {
+  constructor(
+    underlying: StateProvider,
+    maxCacheWeight: number = DEFAULT_MAX_CACHE_WEIGHT,
+  ) {
     this.underlying = underlying;
+    this.maxCacheWeight = maxCacheWeight;
+  }
+
+  /**
+   * Evicts least recently used entries until the cache is under the weight 
limit.
+   * JavaScript Maps preserve insertion order, so the first entry is the 
oldest.
+   */
+  private evictIfNeeded() {
+    while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
+      // Get the first (oldest) entry
+      const firstKey = this.cache.keys().next().value;
+      if (firstKey === undefined) {
+        break;
+      }
+      const evictedEntry = this.cache.get(firstKey);
+      if (evictedEntry !== undefined) {
+        this.currentWeight -= evictedEntry.weight;
+      }
+      this.cache.delete(firstKey);
+    }
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `evictIfNeeded` method can be made more efficient and concise. The 
current implementation performs a redundant check for `firstKey === undefined` 
since the `while` loop already ensures `this.cache.size > 0`. Additionally, it 
performs an extra map lookup with `this.cache.get(firstKey)`.
   
   You can simplify this by using the `Map.prototype.entries()` iterator to get 
both the key and value of the oldest entry in one go.
   
   ```typescript
     private evictIfNeeded() {
       while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
         // Get the first (oldest) entry from the map iterator.
         const [firstKey, evictedEntry] = this.cache.entries().next().value;
         this.currentWeight -= evictedEntry.weight;
         this.cache.delete(firstKey);
       }
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to