Copilot commented on code in PR #8980: URL: https://github.com/apache/gravitino/pull/8980#discussion_r2568374086
########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Scan plan cache. */ +public class ScanPlanCache implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ScanPlanCache.class); + + private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache; + private final ScheduledExecutorService cleanupExecutor; + + public ScanPlanCache(int capacity, int expireMinutes) { + LOG.info( + "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} minutes", + capacity, + expireMinutes); + + // Create a scheduled executor for periodic cleanup + this.cleanupExecutor = + new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("scan-plan-cache-cleanup-%d") + .build()); + + this.scanPlanCache = + Caffeine.newBuilder() + .maximumSize(capacity) + .expireAfterAccess(expireMinutes, TimeUnit.MINUTES) + .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor)) + .removalListener( + (key, value, cause) -> { + LOG.debug("Evicted scan plan from cache: {}, cause: {}", key, cause); + }) + .build(); + + LOG.info("ScanPlanCache initialized with automatic cleanup"); + } + + public PlanTableScanResponse get(Table table, PlanTableScanRequest scanRequest) { + ScanPlanCacheKey key = ScanPlanCacheKey.create(table, scanRequest); + PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key); + + if (cachedResponse != null) { + LOG.info("Cache HIT for table: {}, snapshot: {}", table.name(), key.snapshotId); + } else { + LOG.debug("Cache MISS for table: {}, snapshot: {}", table.name(), key.snapshotId); + } + + return cachedResponse; + } + + public void put( + Table table, PlanTableScanRequest scanRequest, PlanTableScanResponse scanResponse) { + ScanPlanCacheKey key = ScanPlanCacheKey.create(table, scanRequest); + scanPlanCache.put(key, scanResponse); + LOG.debug("Cached scan plan for table: {}, snapshot: {}", table.name(), key.snapshotId); + } + + @Override + public void close() throws IOException { + LOG.info("Closing ScanPlanCache"); + + if (scanPlanCache != null) { + scanPlanCache.invalidateAll(); + scanPlanCache.cleanUp(); + } + + if (cleanupExecutor != null && !cleanupExecutor.isShutdown()) { + cleanupExecutor.shutdownNow(); + } + LOG.info("ScanPlanCache closed successfully"); + } + + private static class ScanPlanCacheKey { + private final TableIdentifier tableIdentifier; + private final Long snapshotId; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final String filterStr; + private final String selectStr; + private final String statsFieldsStr; + private final boolean caseSensitive; + + private ScanPlanCacheKey( + TableIdentifier tableIdentifier, + Long snapshotId, + Long startSnapshotId, + Long endSnapshotId, + String filter, + String select, + String statsFields, + boolean caseSensitive) { + this.tableIdentifier = tableIdentifier; + this.snapshotId = snapshotId; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.filterStr = filter; + this.selectStr = select; + this.statsFieldsStr = statsFields; + this.caseSensitive = caseSensitive; + } + + static ScanPlanCacheKey create(Table table, PlanTableScanRequest scanRequest) { + TableIdentifier identifier = TableIdentifier.of(table.name()); Review Comment: **Potential Bug**: Using `TableIdentifier.of(table.name())` loses the namespace information. The `table.name()` method returns just the table name without the namespace, so tables with the same name in different namespaces will collide in the cache. **Fix**: Use the full table identifier that includes namespace: ```java // Get the full identifier from the table's location or use the provided identifier TableIdentifier identifier = TableIdentifier.parse(table.name()); // OR if available from the calling context, pass it as a parameter ``` Alternatively, if the Table object doesn't provide the full path, consider passing the `TableIdentifier` as a parameter to the `get()` and `put()` methods instead of deriving it from the Table. ########## iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java: ########## @@ -283,6 +283,20 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { .intConf() .createWithDefault(60); + public static final ConfigEntry<Integer> SCAN_PLAN_CACHE_CAPACITY = + new ConfigBuilder(IcebergConstants.SCAN_PLAN_CACHE_CAPACITY) + .doc("Scan plan cache capacity") + .version(ConfigConstants.VERSION_1_1_0) + .intConf() + .createWithDefault(200); + + public static final ConfigEntry<Integer> SCAN_PLAN_CACHE_EXPIRE_MINUTES = + new ConfigBuilder(IcebergConstants.SCAN_PLAN_CACHE_EXPIRE_MINUTES) + .doc("Scan plan cache expire minutes") Review Comment: Documentation is too brief. The doc string should clarify this is time-based expiration after last access. Suggested improvement: ```java .doc("Time in minutes after which cached scan plans expire if not accessed. Cached entries are automatically removed after this period of inactivity.") ``` ```suggestion .doc("Time in minutes after which cached scan plans expire if not accessed. Cached entries are automatically removed after this period of inactivity.") ``` ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Scan plan cache. */ +public class ScanPlanCache implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ScanPlanCache.class); + + private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache; + private final ScheduledExecutorService cleanupExecutor; + + public ScanPlanCache(int capacity, int expireMinutes) { + LOG.info( + "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} minutes", + capacity, + expireMinutes); + + // Create a scheduled executor for periodic cleanup + this.cleanupExecutor = + new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("scan-plan-cache-cleanup-%d") + .build()); + + this.scanPlanCache = + Caffeine.newBuilder() + .maximumSize(capacity) + .expireAfterAccess(expireMinutes, TimeUnit.MINUTES) + .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor)) + .removalListener( + (key, value, cause) -> { + LOG.debug("Evicted scan plan from cache: {}, cause: {}", key, cause); + }) + .build(); + + LOG.info("ScanPlanCache initialized with automatic cleanup"); + } + + public PlanTableScanResponse get(Table table, PlanTableScanRequest scanRequest) { + ScanPlanCacheKey key = ScanPlanCacheKey.create(table, scanRequest); + PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key); + + if (cachedResponse != null) { + LOG.info("Cache HIT for table: {}, snapshot: {}", table.name(), key.snapshotId); + } else { + LOG.debug("Cache MISS for table: {}, snapshot: {}", table.name(), key.snapshotId); + } + + return cachedResponse; + } + + public void put( + Table table, PlanTableScanRequest scanRequest, PlanTableScanResponse scanResponse) { + ScanPlanCacheKey key = ScanPlanCacheKey.create(table, scanRequest); + scanPlanCache.put(key, scanResponse); + LOG.debug("Cached scan plan for table: {}, snapshot: {}", table.name(), key.snapshotId); + } + + @Override + public void close() throws IOException { + LOG.info("Closing ScanPlanCache"); + + if (scanPlanCache != null) { + scanPlanCache.invalidateAll(); + scanPlanCache.cleanUp(); + } + + if (cleanupExecutor != null && !cleanupExecutor.isShutdown()) { + cleanupExecutor.shutdownNow(); Review Comment: **Improper Shutdown**: Using `shutdownNow()` is too aggressive and doesn't give tasks a chance to complete gracefully. Consider calling `shutdown()` first with a timeout, then `shutdownNow()` only if needed. Recommended approach: ```java if (cleanupExecutor != null && !cleanupExecutor.isShutdown()) { cleanupExecutor.shutdown(); try { if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) { cleanupExecutor.shutdownNow(); } } catch (InterruptedException e) { cleanupExecutor.shutdownNow(); Thread.currentThread().interrupt(); } } ``` ```suggestion cleanupExecutor.shutdown(); try { if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) { cleanupExecutor.shutdownNow(); } } catch (InterruptedException e) { cleanupExecutor.shutdownNow(); Thread.currentThread().interrupt(); } ``` ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestPlanTableScan.java: ########## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.iceberg.service.IcebergObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") Review Comment: The `@SuppressWarnings("deprecation")` annotation is applied to the entire class but there are no deprecated API usages visible in this file. This annotation should either be removed or moved to specific methods/lines where deprecated APIs are actually used. Class-level suppression of warnings is generally discouraged unless necessary. ```suggestion ``` ########## iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java: ########## @@ -283,6 +283,20 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig { .intConf() .createWithDefault(60); + public static final ConfigEntry<Integer> SCAN_PLAN_CACHE_CAPACITY = + new ConfigBuilder(IcebergConstants.SCAN_PLAN_CACHE_CAPACITY) + .doc("Scan plan cache capacity") Review Comment: Documentation is too brief. The doc strings should be more descriptive to help users understand what these configurations do. Suggested improvement: ```java .doc("Maximum number of scan plan results to cache. Larger values allow more queries to benefit from caching but use more memory.") ``` ```suggestion .doc("Maximum number of scan plan results to cache. Larger values allow more queries to benefit from caching but use more memory.") ``` ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Scan plan cache. */ +public class ScanPlanCache implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ScanPlanCache.class); + + private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache; + private final ScheduledExecutorService cleanupExecutor; + + public ScanPlanCache(int capacity, int expireMinutes) { + LOG.info( + "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} minutes", + capacity, + expireMinutes); + + // Create a scheduled executor for periodic cleanup + this.cleanupExecutor = + new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("scan-plan-cache-cleanup-%d") + .build()); + + this.scanPlanCache = + Caffeine.newBuilder() + .maximumSize(capacity) + .expireAfterAccess(expireMinutes, TimeUnit.MINUTES) + .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor)) + .removalListener( + (key, value, cause) -> { + LOG.debug("Evicted scan plan from cache: {}, cause: {}", key, cause); + }) + .build(); + + LOG.info("ScanPlanCache initialized with automatic cleanup"); + } + + public PlanTableScanResponse get(Table table, PlanTableScanRequest scanRequest) { + ScanPlanCacheKey key = ScanPlanCacheKey.create(table, scanRequest); + PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key); + + if (cachedResponse != null) { + LOG.info("Cache HIT for table: {}, snapshot: {}", table.name(), key.snapshotId); + } else { + LOG.debug("Cache MISS for table: {}, snapshot: {}", table.name(), key.snapshotId); + } + + return cachedResponse; + } + + public void put( + Table table, PlanTableScanRequest scanRequest, PlanTableScanResponse scanResponse) { + ScanPlanCacheKey key = ScanPlanCacheKey.create(table, scanRequest); + scanPlanCache.put(key, scanResponse); + LOG.debug("Cached scan plan for table: {}, snapshot: {}", table.name(), key.snapshotId); + } + + @Override + public void close() throws IOException { + LOG.info("Closing ScanPlanCache"); + + if (scanPlanCache != null) { + scanPlanCache.invalidateAll(); + scanPlanCache.cleanUp(); + } + + if (cleanupExecutor != null && !cleanupExecutor.isShutdown()) { + cleanupExecutor.shutdownNow(); + } + LOG.info("ScanPlanCache closed successfully"); + } + + private static class ScanPlanCacheKey { + private final TableIdentifier tableIdentifier; + private final Long snapshotId; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final String filterStr; + private final String selectStr; + private final String statsFieldsStr; + private final boolean caseSensitive; + + private ScanPlanCacheKey( + TableIdentifier tableIdentifier, + Long snapshotId, + Long startSnapshotId, + Long endSnapshotId, + String filter, + String select, + String statsFields, + boolean caseSensitive) { + this.tableIdentifier = tableIdentifier; + this.snapshotId = snapshotId; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.filterStr = filter; + this.selectStr = select; + this.statsFieldsStr = statsFields; + this.caseSensitive = caseSensitive; + } + + static ScanPlanCacheKey create(Table table, PlanTableScanRequest scanRequest) { + TableIdentifier identifier = TableIdentifier.of(table.name()); + + // Use current snapshot if not specified + Long snapshotId = scanRequest.snapshotId(); + if (snapshotId == null && table.currentSnapshot() != null) { Review Comment: **Cache Key Issue with snapshotId=0**: The test uses `withSnapshotId(0L)` which is treated as a valid snapshot ID. However, when `snapshotId` is `0`, the code should probably treat it as "use current snapshot" (similar to null). This can lead to incorrect cache behavior: - Request with `snapshotId=0L` and current snapshot is 123 → cache key uses 0 - Table updates to snapshot 124, request with `snapshotId=0L` → cache key still uses 0 (stale data) Consider: ```java Long snapshotId = scanRequest.snapshotId(); if ((snapshotId == null || snapshotId == 0L) && table.currentSnapshot() != null) { snapshotId = table.currentSnapshot().snapshotId(); } ``` ```suggestion if ((snapshotId == null || snapshotId == 0L) && table.currentSnapshot() != null) { ``` ########## docs/iceberg-rest-service.md: ########## @@ -447,6 +448,17 @@ Gravitino features a pluggable cache system for updating or retrieving table met Gravitino provides the build-in `org.apache.gravitino.iceberg.common.cache.LocalTableMetadataCache` to store the cached data in the memory. You could also implement your custom table metadata cache by implementing the `org.apache.gravitino.iceberg.common.cache.TableMetadataCache` interface. +### Iceberg scan plan cache configuration + +Gravitino caches scan plan results to speed up repeated queries with identical parameters. The cache automatically invalidates when the table's snapshot changes. Review Comment: The documentation claims "The cache automatically invalidates when the table's snapshot changes", but this is misleading. The cache does NOT actively invalidate when snapshots change. It uses snapshot ID as part of the cache key, which means different snapshots will have different cache keys (cache miss). This is different from active invalidation. Suggested correction: ```markdown Gravitino caches scan plan results to speed up repeated queries with identical parameters. The cache uses snapshot ID as part of the cache key, so queries against different snapshots will not use stale cached data. ``` ```suggestion Gravitino caches scan plan results to speed up repeated queries with identical parameters. The cache uses snapshot ID as part of the cache key, so queries against different snapshots will not use stale cached data. ``` ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Scan plan cache. */ +public class ScanPlanCache implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ScanPlanCache.class); + + private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache; Review Comment: **Potential Memory Issue**: `PlanTableScanResponse` objects can be very large (containing many serialized scan tasks with file metadata). With a default capacity of 200, this could consume significant heap memory. Consider: 1. Adding a weight-based eviction policy instead of just count-based: ```java .weigher((key, value) -> value.planTasks().size()) .maximumWeight(capacityInTasks) ``` 2. Or document the memory implications in the configuration and suggest adjusting capacity based on typical scan result sizes. ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/ScanPlanCache.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Scan plan cache. */ +public class ScanPlanCache implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ScanPlanCache.class); + + private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache; + private final ScheduledExecutorService cleanupExecutor; + + public ScanPlanCache(int capacity, int expireMinutes) { + LOG.info( + "Initializing ScanPlanCache with capacity: {}, expireAfterAccess: {} minutes", + capacity, + expireMinutes); + + // Create a scheduled executor for periodic cleanup + this.cleanupExecutor = + new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("scan-plan-cache-cleanup-%d") + .build()); + + this.scanPlanCache = + Caffeine.newBuilder() + .maximumSize(capacity) + .expireAfterAccess(expireMinutes, TimeUnit.MINUTES) + .scheduler(Scheduler.forScheduledExecutorService(cleanupExecutor)) + .removalListener( + (key, value, cause) -> { + LOG.debug("Evicted scan plan from cache: {}, cause: {}", key, cause); + }) + .build(); + + LOG.info("ScanPlanCache initialized with automatic cleanup"); + } + + public PlanTableScanResponse get(Table table, PlanTableScanRequest scanRequest) { + ScanPlanCacheKey key = ScanPlanCacheKey.create(table, scanRequest); + PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key); + + if (cachedResponse != null) { + LOG.info("Cache HIT for table: {}, snapshot: {}", table.name(), key.snapshotId); Review Comment: **Logging Level Issue**: Cache HIT is logged at INFO level, which can be too verbose in production. Cache hits are normal operations and should be logged at DEBUG level. Only exceptional cases (cache misses on first access, evictions, etc.) should be INFO. Change to: ```java LOG.debug("Cache HIT for table: {}, snapshot: {}", table.name(), key.snapshotId); ``` ```suggestion LOG.debug("Cache HIT for table: {}, snapshot: {}", table.name(), key.snapshotId); ``` ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestPlanTableScan.java: ########## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.iceberg.service.IcebergObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") +public class TestPlanTableScan extends IcebergNamespaceTestBase { + + private static final Schema tableSchema = + new Schema( + NestedField.of(1, false, "id", org.apache.iceberg.types.Types.IntegerType.get()), + NestedField.of(2, false, "name", StringType.get()), + NestedField.of(3, true, "age", org.apache.iceberg.types.Types.IntegerType.get())); + + private DummyEventListener dummyEventListener; + private ObjectMapper objectMapper; + + @Override + protected Application configure() { + this.dummyEventListener = new DummyEventListener(); + this.objectMapper = IcebergObjectMapper.getInstance(); + + ResourceConfig resourceConfig = + IcebergRestTestUtil.getIcebergResourceConfig( + MockIcebergTableOperations.class, true, Arrays.asList(dummyEventListener)); + // Register namespace operations for setup + resourceConfig.register(MockIcebergNamespaceOperations.class); + resourceConfig.register(MockIcebergTableRenameOperations.class); + + // Register a mock HttpServletRequest with user info + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + HttpServletRequest mockRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> "test-user"); + bind(mockRequest).to(HttpServletRequest.class); + } + }); + + return resourceConfig; + } + + /** Test basic scan planning without any filters or projections. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testBasicScanPlanning(Namespace namespace) throws Exception { + // Setup: Create namespace and table + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_test_table"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan table scan with empty request + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Check response status and structure + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + // Parse response as JSON Map instead of deserializing to PlanTableScanResponse + // to avoid Jackson injectable values issue + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + + // Verify response contains required fields + Assertions.assertNotNull(scanResponse); + Assertions.assertTrue(scanResponse.containsKey("plan-status")); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + + System.out.println("Scan planning completed successfully"); + } + + /** Test scan planning with filter expression. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithFilter(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without filter for now) + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } Review Comment: **Test Does Not Match Name**: Test is named `testScanPlanningWithFilter` but the comment explicitly states "Plan scan (without filter for now)" and doesn't actually test any filter functionality. This test is identical to `testBasicScanPlanning`. Either implement actual filter testing or rename/remove this test. ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestPlanTableScan.java: ########## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.iceberg.service.IcebergObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") +public class TestPlanTableScan extends IcebergNamespaceTestBase { + + private static final Schema tableSchema = + new Schema( + NestedField.of(1, false, "id", org.apache.iceberg.types.Types.IntegerType.get()), + NestedField.of(2, false, "name", StringType.get()), + NestedField.of(3, true, "age", org.apache.iceberg.types.Types.IntegerType.get())); + + private DummyEventListener dummyEventListener; + private ObjectMapper objectMapper; + + @Override + protected Application configure() { + this.dummyEventListener = new DummyEventListener(); + this.objectMapper = IcebergObjectMapper.getInstance(); + + ResourceConfig resourceConfig = + IcebergRestTestUtil.getIcebergResourceConfig( + MockIcebergTableOperations.class, true, Arrays.asList(dummyEventListener)); + // Register namespace operations for setup + resourceConfig.register(MockIcebergNamespaceOperations.class); + resourceConfig.register(MockIcebergTableRenameOperations.class); + + // Register a mock HttpServletRequest with user info + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + HttpServletRequest mockRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> "test-user"); + bind(mockRequest).to(HttpServletRequest.class); + } + }); + + return resourceConfig; + } + + /** Test basic scan planning without any filters or projections. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testBasicScanPlanning(Namespace namespace) throws Exception { + // Setup: Create namespace and table + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_test_table"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan table scan with empty request + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Check response status and structure + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + // Parse response as JSON Map instead of deserializing to PlanTableScanResponse + // to avoid Jackson injectable values issue + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + + // Verify response contains required fields + Assertions.assertNotNull(scanResponse); + Assertions.assertTrue(scanResponse.containsKey("plan-status")); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + + System.out.println("Scan planning completed successfully"); + } + + /** Test scan planning with filter expression. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithFilter(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without filter for now) + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with column projection. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithProjection(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_projection_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with column projection + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder() + .withSnapshotId(0L) + .withSelect(Arrays.asList("id", "name")) + .build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with case-sensitive option. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithCaseSensitive(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_case_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with case-sensitive option + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).withCaseSensitive(true).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning on non-existent table. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningNonExistentTable(Namespace namespace) throws Exception { + // Setup: Create namespace only (no table) + verifyCreateNamespaceSucc(namespace); + + // Execute: Try to plan scan on non-existent table + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, "non_existent_table", scanRequest); + + // Verify: Should return 404 + Assertions.assertEquals(404, response.getStatus()); + } + + /** Test scan planning with complex filter expressions. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithComplexFilters(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_complex_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without complex filter) + // Note: Iceberg 1.10.0 filter needs Expression object + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with snapshot-id parameter. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithSnapshotId(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_snapshot_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with default snapshot + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Should succeed even without specific snapshot + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithOptions(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_options_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without options) + // Note: Iceberg 1.10.0 PlanTableScanRequest doesn't have options() + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + Review Comment: **Test Does Not Match Name**: Test is named `testScanPlanningWithOptions` but doesn't actually test any options (comment says "Plan scan (without options)"). This test is functionally identical to the basic test. Consider either implementing actual options testing or removing this redundant test. ```suggestion ``` ########## docs/iceberg-rest-service.md: ########## @@ -447,6 +448,17 @@ Gravitino features a pluggable cache system for updating or retrieving table met Gravitino provides the build-in `org.apache.gravitino.iceberg.common.cache.LocalTableMetadataCache` to store the cached data in the memory. You could also implement your custom table metadata cache by implementing the `org.apache.gravitino.iceberg.common.cache.TableMetadataCache` interface. +### Iceberg scan plan cache configuration + +Gravitino caches scan plan results to speed up repeated queries with identical parameters. The cache automatically invalidates when the table's snapshot changes. + +| Configuration item | Description | Default value | Required | Since Version | +|------------------------------------------------------------|-----------------------------------------|---------------|----------|---------------| +| `gravitino.iceberg-rest.scan-plan-cache-capacity` | The capacity of scan plan cache. | 200 | No | 1.1.0 | +| `gravitino.iceberg-rest.scan-plan-cache-expire-minutes` | The expire minutes of scan plan cache. | 60 | No | 1.1.0 | + +The scan plan cache uses snapshot ID as part of the cache key, ensuring automatic invalidation when table data changes. This provides 100-2000x speedup for repeated queries like dashboard refreshes or BI tool queries. Review Comment: [nitpick] The "100-2000x speedup" claim is unsubstantiated and overly specific. Without benchmarks or references, this appears speculative. Consider either: 1. Removing the specific speedup claim 2. Adding "can provide significant speedup" instead 3. Referencing actual benchmark data if available Example: ```markdown The scan plan cache uses snapshot ID as part of the cache key, ensuring queries against different snapshots receive correct results. This can provide significant speedup for repeated queries like dashboard refreshes or BI tool queries. ``` ```suggestion The scan plan cache uses snapshot ID as part of the cache key, ensuring automatic invalidation when table data changes. This can provide significant speedup for repeated queries like dashboard refreshes or BI tool queries. ``` ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestPlanTableScan.java: ########## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.iceberg.service.IcebergObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") +public class TestPlanTableScan extends IcebergNamespaceTestBase { + + private static final Schema tableSchema = + new Schema( + NestedField.of(1, false, "id", org.apache.iceberg.types.Types.IntegerType.get()), + NestedField.of(2, false, "name", StringType.get()), + NestedField.of(3, true, "age", org.apache.iceberg.types.Types.IntegerType.get())); + + private DummyEventListener dummyEventListener; + private ObjectMapper objectMapper; + + @Override + protected Application configure() { + this.dummyEventListener = new DummyEventListener(); + this.objectMapper = IcebergObjectMapper.getInstance(); + + ResourceConfig resourceConfig = + IcebergRestTestUtil.getIcebergResourceConfig( + MockIcebergTableOperations.class, true, Arrays.asList(dummyEventListener)); + // Register namespace operations for setup + resourceConfig.register(MockIcebergNamespaceOperations.class); + resourceConfig.register(MockIcebergTableRenameOperations.class); + + // Register a mock HttpServletRequest with user info + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + HttpServletRequest mockRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> "test-user"); + bind(mockRequest).to(HttpServletRequest.class); + } + }); + + return resourceConfig; + } + + /** Test basic scan planning without any filters or projections. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testBasicScanPlanning(Namespace namespace) throws Exception { + // Setup: Create namespace and table + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_test_table"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan table scan with empty request + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Check response status and structure + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + // Parse response as JSON Map instead of deserializing to PlanTableScanResponse + // to avoid Jackson injectable values issue + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + + // Verify response contains required fields + Assertions.assertNotNull(scanResponse); + Assertions.assertTrue(scanResponse.containsKey("plan-status")); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + + System.out.println("Scan planning completed successfully"); + } + + /** Test scan planning with filter expression. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithFilter(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without filter for now) + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with column projection. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithProjection(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_projection_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with column projection + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder() + .withSnapshotId(0L) + .withSelect(Arrays.asList("id", "name")) + .build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with case-sensitive option. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithCaseSensitive(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_case_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with case-sensitive option + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).withCaseSensitive(true).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning on non-existent table. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningNonExistentTable(Namespace namespace) throws Exception { + // Setup: Create namespace only (no table) + verifyCreateNamespaceSucc(namespace); + + // Execute: Try to plan scan on non-existent table + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, "non_existent_table", scanRequest); + + // Verify: Should return 404 + Assertions.assertEquals(404, response.getStatus()); + } + + /** Test scan planning with complex filter expressions. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithComplexFilters(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_complex_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without complex filter) + // Note: Iceberg 1.10.0 filter needs Expression object + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with snapshot-id parameter. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithSnapshotId(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_snapshot_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with default snapshot + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Should succeed even without specific snapshot + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } Review Comment: **Test Does Not Match Name**: Test is named `testScanPlanningWithSnapshotId` but uses `withSnapshotId(0L)` which is essentially the default/current snapshot (not testing a specific snapshot ID). This test is functionally identical to the basic test. To properly test snapshot ID functionality, you should test with an actual specific snapshot ID or test behavior differences between snapshot-specific vs current snapshot queries. ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestPlanTableScan.java: ########## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.iceberg.service.IcebergObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") +public class TestPlanTableScan extends IcebergNamespaceTestBase { + + private static final Schema tableSchema = + new Schema( + NestedField.of(1, false, "id", org.apache.iceberg.types.Types.IntegerType.get()), + NestedField.of(2, false, "name", StringType.get()), + NestedField.of(3, true, "age", org.apache.iceberg.types.Types.IntegerType.get())); + + private DummyEventListener dummyEventListener; + private ObjectMapper objectMapper; + + @Override + protected Application configure() { + this.dummyEventListener = new DummyEventListener(); + this.objectMapper = IcebergObjectMapper.getInstance(); + + ResourceConfig resourceConfig = + IcebergRestTestUtil.getIcebergResourceConfig( + MockIcebergTableOperations.class, true, Arrays.asList(dummyEventListener)); + // Register namespace operations for setup + resourceConfig.register(MockIcebergNamespaceOperations.class); + resourceConfig.register(MockIcebergTableRenameOperations.class); + + // Register a mock HttpServletRequest with user info + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + HttpServletRequest mockRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> "test-user"); + bind(mockRequest).to(HttpServletRequest.class); + } + }); + + return resourceConfig; + } + + /** Test basic scan planning without any filters or projections. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testBasicScanPlanning(Namespace namespace) throws Exception { + // Setup: Create namespace and table + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_test_table"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan table scan with empty request + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Check response status and structure + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + // Parse response as JSON Map instead of deserializing to PlanTableScanResponse + // to avoid Jackson injectable values issue + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + + // Verify response contains required fields + Assertions.assertNotNull(scanResponse); + Assertions.assertTrue(scanResponse.containsKey("plan-status")); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + + System.out.println("Scan planning completed successfully"); Review Comment: Remove `System.out.println()` statement. Use proper logging (SLF4J Logger) instead of printing to stdout in tests. ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java: ########## @@ -259,6 +269,14 @@ public PlanTableScanResponse planTableScan( CloseableIterable<FileScanTask> fileScanTasks = createFilePlanScanTasks(table, tableIdentifier, scanRequest); + if (scanPlanCache != null) { + PlanTableScanResponse cachedResponse = scanPlanCache.get(table, scanRequest); + if (cachedResponse != null) { + LOG.info("Using cached scan plan for table: {}", tableIdentifier); + return cachedResponse; Review Comment: **Misleading Log Message**: The log message "Using cached scan plan" will never be executed as written because the cache is checked AFTER the expensive scan operation. Even after fixing the cache order issue (see previous comment), this log message placement is after the check, so it won't be reached in the current code flow (return statement at line 276). This log statement appears to be orphaned/unreachable. ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestPlanTableScan.java: ########## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.iceberg.service.IcebergObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") +public class TestPlanTableScan extends IcebergNamespaceTestBase { + + private static final Schema tableSchema = + new Schema( + NestedField.of(1, false, "id", org.apache.iceberg.types.Types.IntegerType.get()), + NestedField.of(2, false, "name", StringType.get()), + NestedField.of(3, true, "age", org.apache.iceberg.types.Types.IntegerType.get())); + + private DummyEventListener dummyEventListener; + private ObjectMapper objectMapper; + + @Override + protected Application configure() { + this.dummyEventListener = new DummyEventListener(); + this.objectMapper = IcebergObjectMapper.getInstance(); + + ResourceConfig resourceConfig = + IcebergRestTestUtil.getIcebergResourceConfig( + MockIcebergTableOperations.class, true, Arrays.asList(dummyEventListener)); + // Register namespace operations for setup + resourceConfig.register(MockIcebergNamespaceOperations.class); + resourceConfig.register(MockIcebergTableRenameOperations.class); + + // Register a mock HttpServletRequest with user info + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + HttpServletRequest mockRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> "test-user"); + bind(mockRequest).to(HttpServletRequest.class); + } + }); + + return resourceConfig; + } + + /** Test basic scan planning without any filters or projections. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testBasicScanPlanning(Namespace namespace) throws Exception { + // Setup: Create namespace and table + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_test_table"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan table scan with empty request + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Check response status and structure + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + // Parse response as JSON Map instead of deserializing to PlanTableScanResponse + // to avoid Jackson injectable values issue + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + + // Verify response contains required fields + Assertions.assertNotNull(scanResponse); + Assertions.assertTrue(scanResponse.containsKey("plan-status")); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + + System.out.println("Scan planning completed successfully"); + } + + /** Test scan planning with filter expression. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithFilter(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without filter for now) + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with column projection. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithProjection(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_projection_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with column projection + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder() + .withSnapshotId(0L) + .withSelect(Arrays.asList("id", "name")) + .build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with case-sensitive option. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithCaseSensitive(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_case_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with case-sensitive option + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).withCaseSensitive(true).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning on non-existent table. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningNonExistentTable(Namespace namespace) throws Exception { + // Setup: Create namespace only (no table) + verifyCreateNamespaceSucc(namespace); + + // Execute: Try to plan scan on non-existent table + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, "non_existent_table", scanRequest); + + // Verify: Should return 404 + Assertions.assertEquals(404, response.getStatus()); + } + + /** Test scan planning with complex filter expressions. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithComplexFilters(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_complex_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without complex filter) + // Note: Iceberg 1.10.0 filter needs Expression object + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } Review Comment: **Test Does Not Match Name**: Test is named `testScanPlanningWithComplexFilters` but the comment explicitly states "Plan scan (without complex filter)" and doesn't actually test any filter functionality. This test is identical to `testBasicScanPlanning`. Either implement actual complex filter testing or rename/remove this test to avoid confusion. ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestPlanTableScan.java: ########## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.Invocation; +import javax.ws.rs.core.Application; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; +import org.apache.gravitino.iceberg.service.IcebergObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.rest.RESTUtil; +import org.apache.iceberg.rest.requests.CreateTableRequest; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.LoadTableResponse; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.glassfish.jersey.internal.inject.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; + +@SuppressWarnings("deprecation") +public class TestPlanTableScan extends IcebergNamespaceTestBase { + + private static final Schema tableSchema = + new Schema( + NestedField.of(1, false, "id", org.apache.iceberg.types.Types.IntegerType.get()), + NestedField.of(2, false, "name", StringType.get()), + NestedField.of(3, true, "age", org.apache.iceberg.types.Types.IntegerType.get())); + + private DummyEventListener dummyEventListener; + private ObjectMapper objectMapper; + + @Override + protected Application configure() { + this.dummyEventListener = new DummyEventListener(); + this.objectMapper = IcebergObjectMapper.getInstance(); + + ResourceConfig resourceConfig = + IcebergRestTestUtil.getIcebergResourceConfig( + MockIcebergTableOperations.class, true, Arrays.asList(dummyEventListener)); + // Register namespace operations for setup + resourceConfig.register(MockIcebergNamespaceOperations.class); + resourceConfig.register(MockIcebergTableRenameOperations.class); + + // Register a mock HttpServletRequest with user info + resourceConfig.register( + new AbstractBinder() { + @Override + protected void configure() { + HttpServletRequest mockRequest = Mockito.mock(HttpServletRequest.class); + Mockito.when(mockRequest.getUserPrincipal()).thenReturn(() -> "test-user"); + bind(mockRequest).to(HttpServletRequest.class); + } + }); + + return resourceConfig; + } + + /** Test basic scan planning without any filters or projections. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testBasicScanPlanning(Namespace namespace) throws Exception { + // Setup: Create namespace and table + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_test_table"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan table scan with empty request + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Check response status and structure + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + // Parse response as JSON Map instead of deserializing to PlanTableScanResponse + // to avoid Jackson injectable values issue + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + + // Verify response contains required fields + Assertions.assertNotNull(scanResponse); + Assertions.assertTrue(scanResponse.containsKey("plan-status")); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + + System.out.println("Scan planning completed successfully"); + } + + /** Test scan planning with filter expression. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithFilter(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without filter for now) + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with column projection. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithProjection(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_projection_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with column projection + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder() + .withSnapshotId(0L) + .withSelect(Arrays.asList("id", "name")) + .build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with case-sensitive option. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithCaseSensitive(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_case_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with case-sensitive option + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).withCaseSensitive(true).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning on non-existent table. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningNonExistentTable(Namespace namespace) throws Exception { + // Setup: Create namespace only (no table) + verifyCreateNamespaceSucc(namespace); + + // Execute: Try to plan scan on non-existent table + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + Response response = doPlanTableScan(namespace, "non_existent_table", scanRequest); + + // Verify: Should return 404 + Assertions.assertEquals(404, response.getStatus()); + } + + /** Test scan planning with complex filter expressions. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithComplexFilters(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_complex_filter_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without complex filter) + // Note: Iceberg 1.10.0 filter needs Expression object + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** Test scan planning with snapshot-id parameter. */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithSnapshotId(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_snapshot_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan with default snapshot + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify: Should succeed even without specific snapshot + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningWithOptions(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_options_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Plan scan (without options) + // Note: Iceberg 1.10.0 PlanTableScanRequest doesn't have options() + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(0L).build(); + + Response response = doPlanTableScan(namespace, tableName, scanRequest); + + // Verify + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + String responseBody = response.readEntity(String.class); + Map<String, Object> scanResponse = + objectMapper.readValue(responseBody, new TypeReference<Map<String, Object>>() {}); + Assertions.assertNotNull(scanResponse); + Assertions.assertEquals("completed", scanResponse.get("plan-status")); + } + + /** + * Test error handling with invalid request body. + * + * <p>Verifies that: - Invalid JSON requests are handled gracefully - Appropriate error codes are + * returned + */ + @ParameterizedTest + @MethodSource("org.apache.gravitino.iceberg.service.rest.IcebergRestTestUtil#testNamespaces") + void testScanPlanningInvalidRequestBody(Namespace namespace) throws Exception { + // Setup + verifyCreateNamespaceSucc(namespace); + String tableName = "scan_error_test"; + verifyCreateTableSucc(namespace, tableName); + + // Execute: Try with invalid JSON (sending raw string instead of proper request) + String invalidJson = "{\"invalid\": json}"; + Response response = + getScanClientBuilder(namespace, tableName) + .post(Entity.entity(invalidJson, MediaType.APPLICATION_JSON_TYPE)); + + // Verify: Should return 400 for invalid JSON + Assertions.assertEquals(400, response.getStatus()); + } + + /** Get scan client builder for table scan endpoint. */ + private Invocation.Builder getScanClientBuilder(Namespace namespace, String tableName) { + String path = + Joiner.on("/") + .join( + IcebergRestTestUtil.NAMESPACE_PATH + + "/" + + RESTUtil.encodeNamespace(namespace) + + "/tables", + tableName, + "scan"); + return getIcebergClientBuilder(path, Optional.empty()); + } + + /** Execute plan table scan request. */ + private Response doPlanTableScan( + Namespace namespace, String tableName, PlanTableScanRequest scanRequest) throws Exception { + return getScanClientBuilder(namespace, tableName) + .post(Entity.entity(scanRequest, MediaType.APPLICATION_JSON_TYPE)); + } + + /** Create a table with the test schema. */ + private void verifyCreateTableSucc(Namespace namespace, String tableName) { + CreateTableRequest createTableRequest = + CreateTableRequest.builder().withName(tableName).withSchema(tableSchema).build(); + + Response response = + getTableClientBuilder(namespace, Optional.empty()) + .post(Entity.entity(createTableRequest, MediaType.APPLICATION_JSON_TYPE)); + + Assertions.assertEquals(Status.OK.getStatusCode(), response.getStatus()); + LoadTableResponse loadTableResponse = response.readEntity(LoadTableResponse.class); + Assertions.assertEquals( + tableSchema.columns(), loadTableResponse.tableMetadata().schema().columns()); + } +} Review Comment: **Missing Test Coverage**: No tests verify the scan plan cache functionality, which is the main feature of this PR. The tests only verify that the scan planning endpoint works, but don't test: 1. Cache hits - repeated identical requests should return cached results 2. Cache misses - different requests should not use cache 3. Cache expiration behavior 4. Cache key generation with different scan parameters (filters, projections, snapshots) 5. Cache invalidation when table changes Add tests like: ```java @Test void testScanPlanCaching() { // First call - cache miss Response response1 = doPlanTableScan(namespace, tableName, scanRequest); // Second call - should be cache hit Response response2 = doPlanTableScan(namespace, tableName, scanRequest); // Verify both return same result (could check logs or add metrics) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
