gemini-code-assist[bot] commented on code in PR #37214:
URL: https://github.com/apache/beam/pull/37214#discussion_r2662867230


##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -46,12 +46,102 @@ export interface StateProvider {
 }
 
 // TODO: (Advanced) Cross-bundle caching.
+/**
+ * Wrapper for cached values that tracks their weight (memory size).
+ */
+interface WeightedCacheEntry<T> {
+  entry: MaybePromise<T>;
+  weight: number;
+}
+
+/**
+ * Estimates the memory size of a value in bytes.
+ * This is a simplified estimation - actual memory usage may vary.
+ */
+function estimateSize(value: any): number {
+  if (value === null || value === undefined) {
+    return 8;
+  }
+
+  const type = typeof value;
+
+  if (type === "boolean") {
+    return 4;
+  }
+  if (type === "number") {
+    return 8;
+  }
+  if (type === "string") {
+    // Each character is 2 bytes in JavaScript (UTF-16) + overhead
+    return 40 + value.length * 2;
+  }
+  if (value instanceof Uint8Array || value instanceof Buffer) {
+    return 40 + value.length;
+  }
+  if (Array.isArray(value)) {
+    let size = 40; // Array overhead
+    for (const item of value) {
+      size += estimateSize(item);
+    }
+    return size;
+  }
+  if (type === "object") {
+    let size = 40; // Object overhead
+    for (const key of Object.keys(value)) {
+      size += estimateSize(key) + estimateSize(value[key]);
+    }
+    return size;
+  }
+
+  // Default for unknown types
+  return 64;
+}

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The recursive implementation of `estimateSize` for objects and arrays does 
not handle circular references. If a cached object contains a cycle (e.g., `a.b 
= a`), this function will enter an infinite loop, leading to a stack overflow 
and crashing the worker. This is a critical stability risk.
   
   ```typescript
   function estimateSize(value: any, visited = new Set<any>()): number {
     if (value === null || value === undefined) {
       return 8;
     }
   
     if (typeof value === "object") {
       if (visited.has(value)) {
         return 8; // Account for reference size, not the full object again.
       }
       visited.add(value);
     }
   
     const type = typeof value;
   
     if (type === "boolean") {
       return 4;
     }
     if (type === "number") {
       return 8;
     }
     if (type === "string") {
       // Each character is 2 bytes in JavaScript (UTF-16) + overhead
       return 40 + value.length * 2;
     }
     if (value instanceof Uint8Array || value instanceof Buffer) {
       return 40 + value.length;
     }
     if (Array.isArray(value)) {
       let size = 40; // Array overhead
       for (const item of value) {
         size += estimateSize(item, visited);
       }
       return size;
     }
     if (type === "object") {
       let size = 40; // Object overhead
       for (const key of Object.keys(value)) {
         size += estimateSize(key, visited) + estimateSize(value[key], visited);
       }
       return size;
     }
   
     // Default for unknown types
     return 64;
   }
   ```



##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -62,21 +152,40 @@ export class CachingStateProvider implements StateProvider 
{
       "base64",
     );
     if (this.cache.has(cacheKey)) {
-      return this.cache.get(cacheKey)!;
+      // Cache hit: move to end (most recently used)
+      this.touchCacheEntry(cacheKey);
+      return this.cache.get(cacheKey)!.entry;
     }
+    // Cache miss: fetch from underlying provider
     let result = this.underlying.getState(stateKey, decode);
-    const this_ = this;
     if (result.type === "promise") {
       result = {
         type: "promise",
         promise: result.promise.then((value) => {
-          this_.cache.set(cacheKey, { type: "value", value });
+          // When promise resolves, update cache with resolved value
+          // Get the current entry to update its weight
+          const currentEntry = this.cache.get(cacheKey);
+          if (currentEntry !== undefined) {
+            // Remove old weight from total
+            this.currentWeight -= currentEntry.weight;
+          }
+          const resolvedWeight = estimateSize(value);
+          this.cache.set(cacheKey, {
+            entry: { type: "value", value },
+            weight: resolvedWeight,
+          });
+          this.currentWeight += resolvedWeight;
+          this.evictIfNeeded();
           return value;
         }),
       };
     }
-    // TODO: (Perf) Cache eviction.
-    this.cache.set(cacheKey, result);
+    // Estimate weight for the new entry
+    const weight = result.type === "value" ? estimateSize(result.value) : 64; 
// Promise placeholder weight
+    // Evict if needed before adding new entry
+    this.currentWeight += weight;
+    this.evictIfNeeded();
+    this.cache.set(cacheKey, { entry: result, weight });

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   There is a bug in the ordering of operations for a synchronous cache miss. 
`evictIfNeeded()` is called before the new item is added to the cache via 
`this.cache.set()`. If the cache is empty and a new item's weight exceeds 
`maxCacheWeight`, `evictIfNeeded()` will not run (as `this.cache.size` is 0). 
The oversized item is then added, causing the cache to violate its weight limit 
until the next eviction event. The item should be added to the cache *before* 
eviction is checked.
   
   ```typescript
       // Add new entry and then evict if needed
       this.currentWeight += weight;
       this.cache.set(cacheKey, { entry: result, weight });
       this.evictIfNeeded();
   ```



##########
sdks/typescript/src/apache_beam/worker/state.ts:
##########
@@ -46,12 +46,102 @@ export interface StateProvider {
 }
 
 // TODO: (Advanced) Cross-bundle caching.
+/**
+ * Wrapper for cached values that tracks their weight (memory size).
+ */
+interface WeightedCacheEntry<T> {
+  entry: MaybePromise<T>;
+  weight: number;
+}
+
+/**
+ * Estimates the memory size of a value in bytes.
+ * This is a simplified estimation - actual memory usage may vary.
+ */
+function estimateSize(value: any): number {
+  if (value === null || value === undefined) {
+    return 8;
+  }
+
+  const type = typeof value;
+
+  if (type === "boolean") {
+    return 4;
+  }
+  if (type === "number") {
+    return 8;
+  }
+  if (type === "string") {
+    // Each character is 2 bytes in JavaScript (UTF-16) + overhead
+    return 40 + value.length * 2;
+  }
+  if (value instanceof Uint8Array || value instanceof Buffer) {
+    return 40 + value.length;
+  }
+  if (Array.isArray(value)) {
+    let size = 40; // Array overhead
+    for (const item of value) {
+      size += estimateSize(item);
+    }
+    return size;
+  }
+  if (type === "object") {
+    let size = 40; // Object overhead
+    for (const key of Object.keys(value)) {
+      size += estimateSize(key) + estimateSize(value[key]);
+    }
+    return size;
+  }
+
+  // Default for unknown types
+  return 64;
+}
+
+// Default cache size: 100MB
+const DEFAULT_MAX_CACHE_WEIGHT = 100 * 1024 * 1024;
+
 export class CachingStateProvider implements StateProvider {
   underlying: StateProvider;
-  cache: Map<string, MaybePromise<any>> = new Map();
+  cache: Map<string, WeightedCacheEntry<any>> = new Map();
+  maxCacheWeight: number;
+  currentWeight: number = 0;
 
-  constructor(underlying: StateProvider) {
+  constructor(
+    underlying: StateProvider,
+    maxCacheWeight: number = DEFAULT_MAX_CACHE_WEIGHT,
+  ) {
     this.underlying = underlying;
+    this.maxCacheWeight = maxCacheWeight;
+  }
+
+  /**
+   * Evicts least recently used entries until the cache is under the weight 
limit.
+   * JavaScript Maps preserve insertion order, so the first entry is the 
oldest.
+   */
+  private evictIfNeeded() {
+    while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
+      // Remove the first (oldest) entry
+      const firstKey = this.cache.keys().next().value;
+      if (firstKey !== undefined) {
+        const evicted = this.cache.get(firstKey);
+        if (evicted !== undefined) {
+          this.currentWeight -= evicted.weight;
+        }
+        this.cache.delete(firstKey);
+      }
+    }
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `evictIfNeeded` method can be made more efficient and readable. 
Currently, it gets the first key using `this.cache.keys().next().value` and 
then performs a separate `this.cache.get(firstKey)` lookup. You can achieve 
this in a single operation by iterating over `this.cache.entries()`.
   
   ```typescript
     private evictIfNeeded() {
       while (this.currentWeight > this.maxCacheWeight && this.cache.size > 0) {
         // JavaScript Maps preserve insertion order, so the first entry is the 
oldest.
         const [firstKey, evicted] = this.cache.entries().next().value;
         this.currentWeight -= evicted.weight;
         this.cache.delete(firstKey);
       }
     }
   ```



##########
sdks/typescript/test/state_provider_test.ts:
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import * as assert from "assert";
+import {
+  CachingStateProvider,
+  StateProvider,
+  MaybePromise,
+} from "../src/apache_beam/worker/state";
+import * as fnApi from "../src/apache_beam/proto/beam_fn_api";
+
+/**
+ * Mock StateProvider for testing that tracks call counts.
+ */
+class MockStateProvider implements StateProvider {
+  callCount: number = 0;
+  values: Map<string, any> = new Map();
+  delayMs: number = 0;
+
+  constructor(delayMs: number = 0) {
+    this.delayMs = delayMs;
+  }
+
+  setValue(key: string, value: any) {
+    this.values.set(key, value);
+  }
+
+  getState<T>(
+    stateKey: fnApi.StateKey,
+    decode: (data: Uint8Array) => T,
+  ): MaybePromise<T> {
+    this.callCount++;
+    const key = Buffer.from(fnApi.StateKey.toBinary(stateKey)).toString(
+      "base64",
+    );
+    const value = this.values.get(key);
+
+    if (this.delayMs > 0) {
+      return {
+        type: "promise",
+        promise: new Promise<T>((resolve) => {
+          setTimeout(() => resolve(value), this.delayMs);
+        }),
+      };
+    } else {
+      return { type: "value", value };
+    }
+  }
+}
+
+describe("CachingStateProvider", function () {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The test suite is quite thorough. To make it even more robust, consider 
adding a test case for the edge case where a single item, whose weight is 
larger than `maxCacheWeight`, is added to an empty cache. This would verify 
that the cache's weight limit is strictly enforced. With the recommended fix in 
`state.ts`, such an item should be added and then immediately evicted, leaving 
the cache empty and its weight at zero.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to