Copilot commented on code in PR #8980:
URL: https://github.com/apache/gravitino/pull/8980#discussion_r2579309324


##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Scan plan cache. */
+public class ScanPlanCache implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ScanPlanCache.class);
+
+  private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache;
+  private final ScheduledExecutorService cleanupExecutor;
+

Review Comment:
   The constructor lacks proper Javadoc documentation. According to Apache 
Gravitino coding guidelines, public APIs must have complete documentation with 
`@param` annotations.
   
   Add documentation:
   ```java
   /**
    * Creates a new ScanPlanCache with the specified capacity and expiration 
time.
    * 
    * @param capacity Maximum number of scan plan results to cache. When the 
cache reaches
    *                 this size, least-recently-used entries will be evicted.
    * @param expireMinutes Time in minutes after which cached scan plans expire 
if not accessed.
    *                      Cached entries are automatically removed after this 
period of inactivity.
    */
   ```
   ```suggestion
   
     /**
      * Creates a new ScanPlanCache with the specified capacity and expiration 
time.
      *
      * @param capacity Maximum number of scan plan results to cache. When the 
cache reaches
      *                 this size, least-recently-used entries will be evicted.
      * @param expireMinutes Time in minutes after which cached scan plans 
expire if not accessed.
      *                      Cached entries are automatically removed after 
this period of inactivity.
      */
   ```



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.rest;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.gravitino.iceberg.service.ScanPlanCache;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestScanPlanCache {
+
+  private ScanPlanCache scanPlanCache;
+  private Table mockTable;
+  private Snapshot mockSnapshot;
+  private TableIdentifier tableIdentifier;
+
+  @BeforeEach
+  public void setUp() {
+    scanPlanCache = new ScanPlanCache(10, 60);
+    tableIdentifier = TableIdentifier.of(Namespace.of("test_db"), 
"test_table");
+
+    mockSnapshot = mock(Snapshot.class);
+    when(mockSnapshot.snapshotId()).thenReturn(1L);
+
+    mockTable = mock(Table.class);
+    when(mockTable.name()).thenReturn("test_table");
+    when(mockTable.currentSnapshot()).thenReturn(mockSnapshot);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (scanPlanCache != null) {
+      scanPlanCache.close();
+    }
+  }
+
+  @Test
+  public void testCacheHit() {
+    PlanTableScanRequest scanRequest =
+        new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+    PlanTableScanResponse response =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(tableIdentifier, 
mockTable, scanRequest);
+    Assertions.assertNull(cached1, "First call should be cache miss");
+
+    scanPlanCache.put(tableIdentifier, mockTable, scanRequest, response);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, scanRequest);
+    Assertions.assertNotNull(cached2, "Second call should be cache hit");
+    Assertions.assertEquals(response, cached2);
+    Assertions.assertEquals(PlanStatus.COMPLETED, cached2.planStatus());
+  }
+
+  @Test
+  public void testCacheMiss() {
+    PlanTableScanRequest request1 = new 
PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+    PlanTableScanRequest request2 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("id", "name"))
+            .withSnapshotId(1L)
+            .build();
+
+    PlanTableScanResponse response1 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse response2 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task2"))
+            .build();
+
+    scanPlanCache.put(tableIdentifier, mockTable, request1, response1);
+    scanPlanCache.put(tableIdentifier, mockTable, request2, response2);
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(tableIdentifier, 
mockTable, request1);
+    Assertions.assertNotNull(cached1);
+    Assertions.assertEquals(response1, cached1);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, request2);
+    Assertions.assertNotNull(cached2);
+    Assertions.assertEquals(response2, cached2);
+    Assertions.assertNotEquals(
+        cached1, cached2, "Different requests should have different cache 
entries");
+  }
+
+  @Test
+  public void testCacheKeyWithDifferentSelectOrder() {
+    PlanTableScanRequest request1 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("id", "name"))
+            .withSnapshotId(1L)
+            .build();
+    PlanTableScanRequest request2 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("name", "id"))
+            .withSnapshotId(1L)
+            .build();
+
+    PlanTableScanResponse response =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    scanPlanCache.put(tableIdentifier, mockTable, request1, response);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, request2);
+    Assertions.assertNotNull(
+        cached2, "Select fields with different order should use same cache 
key");
+    Assertions.assertEquals(response, cached2);
+  }
+
+  @Test
+  public void testCacheKeyWithDifferentTableIdentifier() {
+    TableIdentifier table1 = TableIdentifier.of(Namespace.of("db1"), "table1");
+    TableIdentifier table2 = TableIdentifier.of(Namespace.of("db2"), "table2");
+
+    PlanTableScanRequest scanRequest =
+        new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+
+    PlanTableScanResponse response1 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse response2 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task2"))
+            .build();
+
+    scanPlanCache.put(table1, mockTable, scanRequest, response1);
+    scanPlanCache.put(table2, mockTable, scanRequest, response2);
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(table1, mockTable, 
scanRequest);
+    Assertions.assertNotNull(cached1);
+    Assertions.assertEquals(response1, cached1);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(table2, mockTable, 
scanRequest);
+    Assertions.assertNotNull(cached2);
+    Assertions.assertEquals(response2, cached2);
+    Assertions.assertNotEquals(cached1, cached2);
+  }

Review Comment:
   The test coverage is missing a critical test case: verifying that different 
snapshot IDs result in cache misses. This is essential to ensure that the cache 
correctly invalidates when table data changes.
   
   Consider adding a test like:
   ```java
   @Test
   public void testCacheKeyWithDifferentSnapshotId() {
     PlanTableScanRequest request = 
         new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
     
     PlanTableScanResponse response1 = 
         PlanTableScanResponse.builder()
             .withPlanStatus(PlanStatus.COMPLETED)
             .withPlanTasks(Collections.singletonList("task1"))
             .build();
     
     scanPlanCache.put(tableIdentifier, mockTable, request, response1);
     
     // Change the snapshot ID
     Snapshot mockSnapshot2 = mock(Snapshot.class);
     when(mockSnapshot2.snapshotId()).thenReturn(2L);
     when(mockTable.currentSnapshot()).thenReturn(mockSnapshot2);
     
     PlanTableScanResponse cached = scanPlanCache.get(tableIdentifier, 
mockTable, request);
     Assertions.assertNull(cached, "Different snapshot should result in cache 
miss");
   }
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Scan plan cache. */
+public class ScanPlanCache implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ScanPlanCache.class);
+
+  private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache;
+  private final ScheduledExecutorService cleanupExecutor;
+
+  public ScanPlanCache(int capacity, int expireMinutes) {
+    LOG.info(
+        "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} 
minutes",
+        capacity,
+        expireMinutes);
+
+    // Create a scheduled executor for periodic cleanup
+    this.cleanupExecutor =
+        new ScheduledThreadPoolExecutor(
+            1,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("scan-plan-cache-cleanup-%d")
+                .build());
+
+    this.scanPlanCache =
+        Caffeine.newBuilder()
+            .maximumSize(capacity)
+            .expireAfterAccess(expireMinutes, TimeUnit.MINUTES)
+            .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor))
+            .removalListener(
+                (key, value, cause) -> {
+                  LOG.debug("Evicted scan plan from cache: {}, cause: {}", 
key, cause);
+                })
+            .build();
+
+    LOG.info("ScanPlanCache initialized with automatic cleanup");
+  }
+

Review Comment:
   The `get` method lacks proper Javadoc documentation. According to Apache 
Gravitino coding guidelines, public APIs must have complete documentation with 
`@param` and `@return` annotations.
   
   Add documentation:
   ```java
   /**
    * Retrieves a cached scan plan response for the given table and scan 
request.
    * 
    * @param tableIdentifier The identifier of the table being scanned
    * @param table The table instance used to get current snapshot information
    * @param scanRequest The scan request parameters
    * @return The cached scan plan response if available, or null if not in 
cache or cache miss
    */
   ```
   ```suggestion
   
     /**
      * Retrieves a cached scan plan response for the given table and scan 
request.
      *
      * @param tableIdentifier The identifier of the table being scanned
      * @param table The table instance used to get current snapshot information
      * @param scanRequest The scan request parameters
      * @return The cached scan plan response if available, or null if not in 
cache or cache miss
      */
   ```



##########
docs/iceberg-rest-service.md:
##########
@@ -447,6 +448,16 @@ Gravitino features a pluggable cache system for updating 
or retrieving table met
 
 Gravitino provides the build-in 
`org.apache.gravitino.iceberg.common.cache.LocalTableMetadataCache` to store 
the cached data in the memory. You could also implement your custom table 
metadata cache by implementing the 
`org.apache.gravitino.iceberg.common.cache.TableMetadataCache` interface.
 
+### Iceberg scan plan cache configuration
+
+Gravitino caches scan plan results to speed up repeated queries with identical 
parameters. The cache uses snapshot ID as part of the cache key, so queries 
against different snapshots will not use stale cached data.
+| Configuration item                                         | Description     
                        | Default value | Required | Since Version |
+|------------------------------------------------------------|-----------------------------------------|---------------|----------|---------------|
+| `gravitino.iceberg-rest.scan-plan-cache-capacity`          | The capacity of 
scan plan cache.        | 200           | No       | 1.1.0         |
+| `gravitino.iceberg-rest.scan-plan-cache-expire-minutes`    | The expire 
minutes of scan plan cache.  | 60            | No       | 1.1.0         |

Review Comment:
   The description should be more grammatically correct and consistent with the 
capacity description.
   ```suggestion
   | `gravitino.iceberg-rest.scan-plan-cache-expire-minutes`    | The 
expiration time (in minutes) of the scan plan cache.  | 60            | No      
 | 1.1.0         |
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Scan plan cache. */
+public class ScanPlanCache implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ScanPlanCache.class);
+
+  private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache;
+  private final ScheduledExecutorService cleanupExecutor;
+
+  public ScanPlanCache(int capacity, int expireMinutes) {
+    LOG.info(
+        "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} 
minutes",
+        capacity,
+        expireMinutes);
+
+    // Create a scheduled executor for periodic cleanup
+    this.cleanupExecutor =
+        new ScheduledThreadPoolExecutor(
+            1,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("scan-plan-cache-cleanup-%d")
+                .build());
+
+    this.scanPlanCache =
+        Caffeine.newBuilder()
+            .maximumSize(capacity)
+            .expireAfterAccess(expireMinutes, TimeUnit.MINUTES)
+            .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor))
+            .removalListener(
+                (key, value, cause) -> {
+                  LOG.debug("Evicted scan plan from cache: {}, cause: {}", 
key, cause);
+                })
+            .build();
+
+    LOG.info("ScanPlanCache initialized with automatic cleanup");
+  }
+
+  public PlanTableScanResponse get(
+      TableIdentifier tableIdentifier, Table table, PlanTableScanRequest 
scanRequest) {
+    ScanPlanCacheKey key = ScanPlanCacheKey.create(tableIdentifier, table, 
scanRequest);
+    PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key);
+
+    if (cachedResponse != null) {
+      LOG.debug("Cache HIT for table: {}, snapshot: {}", tableIdentifier, 
key.snapshotId);
+    } else {
+      LOG.debug("Cache MISS for table: {}, snapshot: {}", tableIdentifier, 
key.snapshotId);
+    }
+
+    return cachedResponse;
+  }
+
+  public void put(
+      TableIdentifier tableIdentifier,
+      Table table,
+      PlanTableScanRequest scanRequest,
+      PlanTableScanResponse scanResponse) {
+    ScanPlanCacheKey key = ScanPlanCacheKey.create(tableIdentifier, table, 
scanRequest);
+    scanPlanCache.put(key, scanResponse);
+    LOG.debug("Cached scan plan for table: {}, snapshot: {}", tableIdentifier, 
key.snapshotId);
+  }
+
+  @Override
+  public void close() throws IOException {
+    LOG.info("Closing ScanPlanCache");
+
+    if (scanPlanCache != null) {
+      scanPlanCache.invalidateAll();
+      scanPlanCache.cleanUp();
+    }
+
+    if (cleanupExecutor != null && !cleanupExecutor.isShutdown()) {
+      cleanupExecutor.shutdown();
+      try {
+        if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          cleanupExecutor.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        cleanupExecutor.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+    LOG.info("ScanPlanCache closed successfully");
+  }
+
+  private static class ScanPlanCacheKey {
+    private final TableIdentifier tableIdentifier;
+    private final Long snapshotId;
+    private final Long startSnapshotId;
+    private final Long endSnapshotId;
+    private final String filterStr;
+    private final String selectStr;
+    private final String statsFieldsStr;
+    private final boolean caseSensitive;
+
+    private ScanPlanCacheKey(
+        TableIdentifier tableIdentifier,
+        Long snapshotId,
+        Long startSnapshotId,
+        Long endSnapshotId,
+        String filter,
+        String select,
+        String statsFields,
+        boolean caseSensitive) {
+      this.tableIdentifier = tableIdentifier;
+      this.snapshotId = snapshotId;
+      this.startSnapshotId = startSnapshotId;
+      this.endSnapshotId = endSnapshotId;
+      this.filterStr = filter;
+      this.selectStr = select;
+      this.statsFieldsStr = statsFields;
+      this.caseSensitive = caseSensitive;
+    }
+
+    static ScanPlanCacheKey create(
+        TableIdentifier tableIdentifier, Table table, PlanTableScanRequest 
scanRequest) {
+
+      // Use current snapshot if not specified
+      Long snapshotId = scanRequest.snapshotId();
+      if ((snapshotId == null || snapshotId == 0L) && table.currentSnapshot() 
!= null) {
+        snapshotId = table.currentSnapshot().snapshotId();
+      }
+
+      // Include startSnapshotId and endSnapshotId in the key
+      Long startSnapshotId = scanRequest.startSnapshotId();
+      Long endSnapshotId = scanRequest.endSnapshotId();
+

Review Comment:
   Using `toString()` on the filter object for cache key generation may be 
fragile. If the filter's `toString()` implementation changes or doesn't produce 
a consistent representation, it could lead to incorrect cache behavior (false 
misses).
   
   Consider adding a comment explaining this assumption and the risk:
   ```java
   // Note: Using toString() for filter representation. This assumes the 
filter's
   // toString() method produces a consistent, deterministic string 
representation.
   // If the filter implementation changes, cache effectiveness may be impacted.
   String filterStr = scanRequest.filter() != null ? 
scanRequest.filter().toString() : "";
   ```
   ```suggestion
   
         // Note: Using toString() for filter representation. This assumes the 
filter's
         // toString() method produces a consistent, deterministic string 
representation.
         // If the filter implementation changes, cache effectiveness may be 
impacted.
   ```



##########
docs/iceberg-rest-service.md:
##########
@@ -447,6 +448,16 @@ Gravitino features a pluggable cache system for updating 
or retrieving table met
 
 Gravitino provides the build-in 
`org.apache.gravitino.iceberg.common.cache.LocalTableMetadataCache` to store 
the cached data in the memory. You could also implement your custom table 
metadata cache by implementing the 
`org.apache.gravitino.iceberg.common.cache.TableMetadataCache` interface.
 
+### Iceberg scan plan cache configuration
+
+Gravitino caches scan plan results to speed up repeated queries with identical 
parameters. The cache uses snapshot ID as part of the cache key, so queries 
against different snapshots will not use stale cached data.
+| Configuration item                                         | Description     
                        | Default value | Required | Since Version |
+|------------------------------------------------------------|-----------------------------------------|---------------|----------|---------------|
+| `gravitino.iceberg-rest.scan-plan-cache-capacity`          | The capacity of 
scan plan cache.        | 200           | No       | 1.1.0         |

Review Comment:
   Missing article "the" for better readability.
   ```suggestion
   | `gravitino.iceberg-rest.scan-plan-cache-capacity`          | The capacity 
of the scan plan cache.    | 200           | No       | 1.1.0         |
   ```



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.rest;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.gravitino.iceberg.service.ScanPlanCache;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestScanPlanCache {
+
+  private ScanPlanCache scanPlanCache;
+  private Table mockTable;
+  private Snapshot mockSnapshot;
+  private TableIdentifier tableIdentifier;
+
+  @BeforeEach
+  public void setUp() {
+    scanPlanCache = new ScanPlanCache(10, 60);
+    tableIdentifier = TableIdentifier.of(Namespace.of("test_db"), 
"test_table");
+
+    mockSnapshot = mock(Snapshot.class);
+    when(mockSnapshot.snapshotId()).thenReturn(1L);
+
+    mockTable = mock(Table.class);
+    when(mockTable.name()).thenReturn("test_table");
+    when(mockTable.currentSnapshot()).thenReturn(mockSnapshot);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (scanPlanCache != null) {
+      scanPlanCache.close();
+    }
+  }
+
+  @Test
+  public void testCacheHit() {
+    PlanTableScanRequest scanRequest =
+        new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+    PlanTableScanResponse response =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(tableIdentifier, 
mockTable, scanRequest);
+    Assertions.assertNull(cached1, "First call should be cache miss");
+
+    scanPlanCache.put(tableIdentifier, mockTable, scanRequest, response);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, scanRequest);
+    Assertions.assertNotNull(cached2, "Second call should be cache hit");
+    Assertions.assertEquals(response, cached2);
+    Assertions.assertEquals(PlanStatus.COMPLETED, cached2.planStatus());
+  }
+
+  @Test
+  public void testCacheMiss() {
+    PlanTableScanRequest request1 = new 
PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+    PlanTableScanRequest request2 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("id", "name"))
+            .withSnapshotId(1L)
+            .build();
+
+    PlanTableScanResponse response1 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse response2 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task2"))
+            .build();
+
+    scanPlanCache.put(tableIdentifier, mockTable, request1, response1);
+    scanPlanCache.put(tableIdentifier, mockTable, request2, response2);
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(tableIdentifier, 
mockTable, request1);
+    Assertions.assertNotNull(cached1);
+    Assertions.assertEquals(response1, cached1);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, request2);
+    Assertions.assertNotNull(cached2);
+    Assertions.assertEquals(response2, cached2);
+    Assertions.assertNotEquals(
+        cached1, cached2, "Different requests should have different cache 
entries");
+  }
+
+  @Test
+  public void testCacheKeyWithDifferentSelectOrder() {
+    PlanTableScanRequest request1 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("id", "name"))
+            .withSnapshotId(1L)
+            .build();
+    PlanTableScanRequest request2 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("name", "id"))
+            .withSnapshotId(1L)
+            .build();
+
+    PlanTableScanResponse response =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    scanPlanCache.put(tableIdentifier, mockTable, request1, response);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, request2);
+    Assertions.assertNotNull(
+        cached2, "Select fields with different order should use same cache 
key");
+    Assertions.assertEquals(response, cached2);
+  }
+
+  @Test
+  public void testCacheKeyWithDifferentTableIdentifier() {
+    TableIdentifier table1 = TableIdentifier.of(Namespace.of("db1"), "table1");
+    TableIdentifier table2 = TableIdentifier.of(Namespace.of("db2"), "table2");
+
+    PlanTableScanRequest scanRequest =
+        new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+
+    PlanTableScanResponse response1 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse response2 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task2"))
+            .build();
+
+    scanPlanCache.put(table1, mockTable, scanRequest, response1);
+    scanPlanCache.put(table2, mockTable, scanRequest, response2);
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(table1, mockTable, 
scanRequest);
+    Assertions.assertNotNull(cached1);
+    Assertions.assertEquals(response1, cached1);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(table2, mockTable, 
scanRequest);
+    Assertions.assertNotNull(cached2);
+    Assertions.assertEquals(response2, cached2);
+    Assertions.assertNotEquals(cached1, cached2);
+  }

Review Comment:
   Missing test coverage for cache expiration behavior. Since the cache is 
configured with an expiration time, there should be a test verifying that 
entries expire after the configured period of inactivity.
   
   Consider adding a test that validates the expiration behavior with a short 
timeout.



##########
iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java:
##########
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.rest;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.gravitino.iceberg.service.ScanPlanCache;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestScanPlanCache {
+
+  private ScanPlanCache scanPlanCache;
+  private Table mockTable;
+  private Snapshot mockSnapshot;
+  private TableIdentifier tableIdentifier;
+
+  @BeforeEach
+  public void setUp() {
+    scanPlanCache = new ScanPlanCache(10, 60);
+    tableIdentifier = TableIdentifier.of(Namespace.of("test_db"), 
"test_table");
+
+    mockSnapshot = mock(Snapshot.class);
+    when(mockSnapshot.snapshotId()).thenReturn(1L);
+
+    mockTable = mock(Table.class);
+    when(mockTable.name()).thenReturn("test_table");
+    when(mockTable.currentSnapshot()).thenReturn(mockSnapshot);
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    if (scanPlanCache != null) {
+      scanPlanCache.close();
+    }
+  }
+
+  @Test
+  public void testCacheHit() {
+    PlanTableScanRequest scanRequest =
+        new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+    PlanTableScanResponse response =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(tableIdentifier, 
mockTable, scanRequest);
+    Assertions.assertNull(cached1, "First call should be cache miss");
+
+    scanPlanCache.put(tableIdentifier, mockTable, scanRequest, response);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, scanRequest);
+    Assertions.assertNotNull(cached2, "Second call should be cache hit");
+    Assertions.assertEquals(response, cached2);
+    Assertions.assertEquals(PlanStatus.COMPLETED, cached2.planStatus());
+  }
+
+  @Test
+  public void testCacheMiss() {
+    PlanTableScanRequest request1 = new 
PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+    PlanTableScanRequest request2 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("id", "name"))
+            .withSnapshotId(1L)
+            .build();
+
+    PlanTableScanResponse response1 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse response2 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task2"))
+            .build();
+
+    scanPlanCache.put(tableIdentifier, mockTable, request1, response1);
+    scanPlanCache.put(tableIdentifier, mockTable, request2, response2);
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(tableIdentifier, 
mockTable, request1);
+    Assertions.assertNotNull(cached1);
+    Assertions.assertEquals(response1, cached1);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, request2);
+    Assertions.assertNotNull(cached2);
+    Assertions.assertEquals(response2, cached2);
+    Assertions.assertNotEquals(
+        cached1, cached2, "Different requests should have different cache 
entries");
+  }
+
+  @Test
+  public void testCacheKeyWithDifferentSelectOrder() {
+    PlanTableScanRequest request1 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("id", "name"))
+            .withSnapshotId(1L)
+            .build();
+    PlanTableScanRequest request2 =
+        new PlanTableScanRequest.Builder()
+            .withSelect(Arrays.asList("name", "id"))
+            .withSnapshotId(1L)
+            .build();
+
+    PlanTableScanResponse response =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    scanPlanCache.put(tableIdentifier, mockTable, request1, response);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(tableIdentifier, 
mockTable, request2);
+    Assertions.assertNotNull(
+        cached2, "Select fields with different order should use same cache 
key");
+    Assertions.assertEquals(response, cached2);
+  }
+
+  @Test
+  public void testCacheKeyWithDifferentTableIdentifier() {
+    TableIdentifier table1 = TableIdentifier.of(Namespace.of("db1"), "table1");
+    TableIdentifier table2 = TableIdentifier.of(Namespace.of("db2"), "table2");
+
+    PlanTableScanRequest scanRequest =
+        new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+
+    PlanTableScanResponse response1 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task1"))
+            .build();
+
+    PlanTableScanResponse response2 =
+        PlanTableScanResponse.builder()
+            .withPlanStatus(PlanStatus.COMPLETED)
+            .withPlanTasks(Collections.singletonList("task2"))
+            .build();
+
+    scanPlanCache.put(table1, mockTable, scanRequest, response1);
+    scanPlanCache.put(table2, mockTable, scanRequest, response2);
+
+    PlanTableScanResponse cached1 = scanPlanCache.get(table1, mockTable, 
scanRequest);
+    Assertions.assertNotNull(cached1);
+    Assertions.assertEquals(response1, cached1);
+
+    PlanTableScanResponse cached2 = scanPlanCache.get(table2, mockTable, 
scanRequest);
+    Assertions.assertNotNull(cached2);
+    Assertions.assertEquals(response2, cached2);
+    Assertions.assertNotEquals(cached1, cached2);
+  }

Review Comment:
   Missing test coverage for the scenario where `Table.currentSnapshot()` 
returns `null` (empty table). This is an important edge case that should be 
tested to ensure the cache handles empty tables correctly.
   
   Consider adding a test:
   ```java
   @Test
   public void testCacheWithNullSnapshot() {
     when(mockTable.currentSnapshot()).thenReturn(null);
     
     PlanTableScanRequest request = 
         new PlanTableScanRequest.Builder().build();  // No explicit snapshot ID
     
     PlanTableScanResponse response = 
         PlanTableScanResponse.builder()
             .withPlanStatus(PlanStatus.COMPLETED)
             .withPlanTasks(Collections.emptyList())
             .build();
     
     scanPlanCache.put(tableIdentifier, mockTable, request, response);
     PlanTableScanResponse cached = scanPlanCache.get(tableIdentifier, 
mockTable, request);
     
     Assertions.assertNotNull(cached);
     Assertions.assertEquals(response, cached);
   }
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Scan plan cache. */
+public class ScanPlanCache implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ScanPlanCache.class);
+
+  private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache;
+  private final ScheduledExecutorService cleanupExecutor;
+
+  public ScanPlanCache(int capacity, int expireMinutes) {
+    LOG.info(
+        "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} 
minutes",
+        capacity,
+        expireMinutes);
+
+    // Create a scheduled executor for periodic cleanup
+    this.cleanupExecutor =
+        new ScheduledThreadPoolExecutor(
+            1,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("scan-plan-cache-cleanup-%d")
+                .build());
+
+    this.scanPlanCache =
+        Caffeine.newBuilder()
+            .maximumSize(capacity)
+            .expireAfterAccess(expireMinutes, TimeUnit.MINUTES)
+            .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor))
+            .removalListener(
+                (key, value, cause) -> {
+                  LOG.debug("Evicted scan plan from cache: {}, cause: {}", 
key, cause);
+                })
+            .build();
+
+    LOG.info("ScanPlanCache initialized with automatic cleanup");
+  }
+
+  public PlanTableScanResponse get(
+      TableIdentifier tableIdentifier, Table table, PlanTableScanRequest 
scanRequest) {
+    ScanPlanCacheKey key = ScanPlanCacheKey.create(tableIdentifier, table, 
scanRequest);
+    PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key);
+
+    if (cachedResponse != null) {
+      LOG.debug("Cache HIT for table: {}, snapshot: {}", tableIdentifier, 
key.snapshotId);
+    } else {
+      LOG.debug("Cache MISS for table: {}, snapshot: {}", tableIdentifier, 
key.snapshotId);
+    }
+
+    return cachedResponse;
+  }
+
+  public void put(
+      TableIdentifier tableIdentifier,
+      Table table,
+      PlanTableScanRequest scanRequest,
+      PlanTableScanResponse scanResponse) {
+    ScanPlanCacheKey key = ScanPlanCacheKey.create(tableIdentifier, table, 
scanRequest);
+    scanPlanCache.put(key, scanResponse);
+    LOG.debug("Cached scan plan for table: {}, snapshot: {}", tableIdentifier, 
key.snapshotId);
+  }
+
+  @Override
+  public void close() throws IOException {
+    LOG.info("Closing ScanPlanCache");
+
+    if (scanPlanCache != null) {
+      scanPlanCache.invalidateAll();
+      scanPlanCache.cleanUp();
+    }
+
+    if (cleanupExecutor != null && !cleanupExecutor.isShutdown()) {
+      cleanupExecutor.shutdown();
+      try {
+        if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+          cleanupExecutor.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        cleanupExecutor.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+    LOG.info("ScanPlanCache closed successfully");
+  }
+
+  private static class ScanPlanCacheKey {
+    private final TableIdentifier tableIdentifier;
+    private final Long snapshotId;
+    private final Long startSnapshotId;
+    private final Long endSnapshotId;
+    private final String filterStr;
+    private final String selectStr;
+    private final String statsFieldsStr;
+    private final boolean caseSensitive;
+
+    private ScanPlanCacheKey(
+        TableIdentifier tableIdentifier,
+        Long snapshotId,
+        Long startSnapshotId,
+        Long endSnapshotId,
+        String filter,
+        String select,
+        String statsFields,
+        boolean caseSensitive) {
+      this.tableIdentifier = tableIdentifier;
+      this.snapshotId = snapshotId;
+      this.startSnapshotId = startSnapshotId;
+      this.endSnapshotId = endSnapshotId;
+      this.filterStr = filter;
+      this.selectStr = select;
+      this.statsFieldsStr = statsFields;
+      this.caseSensitive = caseSensitive;
+    }
+
+    static ScanPlanCacheKey create(
+        TableIdentifier tableIdentifier, Table table, PlanTableScanRequest 
scanRequest) {
+
+      // Use current snapshot if not specified
+      Long snapshotId = scanRequest.snapshotId();
+      if ((snapshotId == null || snapshotId == 0L) && table.currentSnapshot() 
!= null) {

Review Comment:
   The condition `snapshotId == 0L` is problematic. In Iceberg, snapshot ID 0 
is not a valid sentinel value - snapshot IDs start from 1 and increment. 
Checking for `0L` could mask bugs where an invalid snapshot ID is passed.
   
   The correct approach is to only check for `null`:
   ```java
   if (snapshotId == null && table.currentSnapshot() != null) {
     snapshotId = table.currentSnapshot().snapshotId();
   }
   ```
   
   This ensures we only use the current snapshot when no snapshot ID is 
explicitly provided, rather than also treating 0 as "use current".
   ```suggestion
         if (snapshotId == null && table.currentSnapshot() != null) {
   ```



##########
iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java:
##########
@@ -283,6 +283,22 @@ public class IcebergConfig extends Config implements 
OverwriteDefaultConfig {
           .intConf()
           .createWithDefault(60);
 
+  public static final ConfigEntry<Integer> SCAN_PLAN_CACHE_CAPACITY =
+      new ConfigBuilder(IcebergConstants.SCAN_PLAN_CACHE_CAPACITY)
+          .doc(
+              "Maximum number of scan plan results to cache. Larger values 
allow more queries to benefit from caching but use more memory.")

Review Comment:
   The documentation could be more explicit about memory implications. Scan 
plan responses can be large (they contain file-level scan tasks), and 200 
cached entries might consume significant memory.
   
   Consider enhancing the documentation to provide memory guidance:
   ```java
   .doc(
       "Maximum number of scan plan results to cache. Larger values allow more 
queries to benefit "
       + "from caching but use more memory. Each cached entry stores the 
complete scan plan response, "
       + "which can be large for tables with many files. A typical scan plan 
might be several KB to MB "
       + "depending on table size.")
   ```
   ```suggestion
                 "Maximum number of scan plan results to cache. Larger values 
allow more queries to benefit "
                 + "from caching but use more memory. Each cached entry stores 
the complete scan plan response, "
                 + "which can be large for tables with many files. A typical 
scan plan might be several KB to MB "
                 + "depending on table size.")
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Scan plan cache. */
+public class ScanPlanCache implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ScanPlanCache.class);
+
+  private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache;
+  private final ScheduledExecutorService cleanupExecutor;
+
+  public ScanPlanCache(int capacity, int expireMinutes) {
+    LOG.info(
+        "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} 
minutes",
+        capacity,
+        expireMinutes);
+
+    // Create a scheduled executor for periodic cleanup
+    this.cleanupExecutor =
+        new ScheduledThreadPoolExecutor(
+            1,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("scan-plan-cache-cleanup-%d")
+                .build());
+
+    this.scanPlanCache =
+        Caffeine.newBuilder()
+            .maximumSize(capacity)
+            .expireAfterAccess(expireMinutes, TimeUnit.MINUTES)
+            .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor))
+            .removalListener(
+                (key, value, cause) -> {
+                  LOG.debug("Evicted scan plan from cache: {}, cause: {}", 
key, cause);
+                })
+            .build();
+
+    LOG.info("ScanPlanCache initialized with automatic cleanup");
+  }
+
+  public PlanTableScanResponse get(
+      TableIdentifier tableIdentifier, Table table, PlanTableScanRequest 
scanRequest) {
+    ScanPlanCacheKey key = ScanPlanCacheKey.create(tableIdentifier, table, 
scanRequest);
+    PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key);
+
+    if (cachedResponse != null) {
+      LOG.debug("Cache HIT for table: {}, snapshot: {}", tableIdentifier, 
key.snapshotId);
+    } else {
+      LOG.debug("Cache MISS for table: {}, snapshot: {}", tableIdentifier, 
key.snapshotId);
+    }
+
+    return cachedResponse;
+  }
+
+  public void put(
+      TableIdentifier tableIdentifier,
+      Table table,
+      PlanTableScanRequest scanRequest,
+      PlanTableScanResponse scanResponse) {

Review Comment:
   The `put` method lacks proper Javadoc documentation. According to Apache 
Gravitino coding guidelines, public APIs must have complete documentation with 
`@param` annotations.
   
   Add documentation:
   ```java
   /**
    * Stores a scan plan response in the cache.
    * 
    * @param tableIdentifier The identifier of the table being scanned
    * @param table The table instance used to get current snapshot information
    * @param scanRequest The scan request parameters used to generate the 
response
    * @param scanResponse The scan plan response to cache
    */
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Scan plan cache. */
+public class ScanPlanCache implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ScanPlanCache.class);
+
+  private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache;
+  private final ScheduledExecutorService cleanupExecutor;
+
+  public ScanPlanCache(int capacity, int expireMinutes) {
+    LOG.info(
+        "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} 
minutes",
+        capacity,
+        expireMinutes);
+
+    // Create a scheduled executor for periodic cleanup
+    this.cleanupExecutor =
+        new ScheduledThreadPoolExecutor(
+            1,
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setNameFormat("scan-plan-cache-cleanup-%d")
+                .build());
+
+    this.scanPlanCache =
+        Caffeine.newBuilder()
+            .maximumSize(capacity)
+            .expireAfterAccess(expireMinutes, TimeUnit.MINUTES)
+            .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor))
+            .removalListener(
+                (key, value, cause) -> {
+                  LOG.debug("Evicted scan plan from cache: {}, cause: {}", 
key, cause);
+                })
+            .build();
+
+    LOG.info("ScanPlanCache initialized with automatic cleanup");
+  }

Review Comment:
   The constructor lacks input validation for the capacity and expireMinutes 
parameters. According to Apache coding guidelines, proper input validation 
should be performed.
   
   Add validation to ensure parameters are positive:
   ```java
   public ScanPlanCache(int capacity, int expireMinutes) {
     if (capacity <= 0) {
       throw new IllegalArgumentException("Cache capacity must be positive, 
got: " + capacity);
     }
     if (expireMinutes <= 0) {
       throw new IllegalArgumentException("Cache expiration time must be 
positive, got: " + expireMinutes);
     }
     
     LOG.info(
         "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} 
minutes",
         capacity,
         expireMinutes);
     // ... rest of implementation
   }
   ```



##########
iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.base.Objects;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Scan plan cache. */

Review Comment:
   The class-level documentation is too brief for a public API class. According 
to Apache Gravitino coding guidelines, public APIs must be well-documented.
   
   The documentation should include:
   - Purpose and use cases of the cache
   - Thread-safety guarantees (the Caffeine cache is thread-safe)
   - Cache invalidation strategy (snapshot-based)
   - Memory implications and configuration guidance
   - Example usage
   
   Consider expanding it to:
   ```java
   /**
    * Cache for Iceberg scan plan results to improve query performance.
    * 
    * <p>This cache stores {@link PlanTableScanResponse} objects keyed by table 
identifier,
    * snapshot ID, and scan request parameters. The cache automatically 
invalidates entries
    * when they expire or when the cache reaches capacity.
    * 
    * <p>Thread-safe: This cache uses Caffeine's thread-safe implementation and 
can be safely
    * accessed from multiple threads concurrently.
    * 
    * <p>Cache invalidation: The cache uses snapshot IDs as part of the cache 
key, ensuring
    * that scan plans are automatically invalidated when table data changes 
(new snapshots).
    * Additionally, entries expire after a configured period of inactivity.
    * 
    * <p>Memory considerations: Each cached entry stores the complete scan plan 
response.
    * Configure capacity appropriately based on your workload and available 
memory.
    */
   ```
   ```suggestion
   /**
    * Cache for Iceberg scan plan results to improve query performance.
    *
    * <p>This cache stores {@link PlanTableScanResponse} objects keyed by table 
identifier,
    * snapshot ID, and scan request parameters. The cache automatically 
invalidates entries
    * when they expire or when the cache reaches capacity.
    *
    * <p><b>Thread-safety:</b> This cache uses Caffeine's thread-safe 
implementation and can be safely
    * accessed from multiple threads concurrently.
    *
    * <p><b>Cache invalidation:</b> The cache uses snapshot IDs as part of the 
cache key, ensuring
    * that scan plans are automatically invalidated when table data changes 
(new snapshots).
    * Additionally, entries expire after a configured period of inactivity 
(expireAfterAccess),
    * and when the cache exceeds its maximum size (capacity).
    *
    * <p><b>Memory considerations:</b> Each cached entry stores the complete 
scan plan response.
    * Configure the cache capacity and expiration appropriately based on your 
workload and available memory.
    *
    * <p><b>Example usage:</b>
    * <pre>{@code
    *   ScanPlanCache cache = new ScanPlanCache(1000, 10); // capacity: 1000, 
expire after 10 minutes
    *   PlanTableScanResponse response = cache.get(tableIdentifier, table, 
scanRequest);
    *   if (response == null) {
    *     // Compute scan plan and cache it
    *     response = ...;
    *     cache.put(tableIdentifier, table, scanRequest, response);
    *   }
    * }</pre>
    */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to