This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 9dd4aa062c [#9048] feat(iceberg-rest-catalog): Add cache for scan
planning. (#8980)
9dd4aa062c is described below
commit 9dd4aa062c13bd3f3bc03b914dc1415799839b56
Author: Xiaojian Sun <[email protected]>
AuthorDate: Tue Jan 13 14:45:57 2026 +0800
[#9048] feat(iceberg-rest-catalog): Add cache for scan planning. (#8980)
### What changes were proposed in this pull request?
Support scan planning endpoint for Iceberg REST server
### Why are the changes needed?
Fix: #([9048](https://github.com/apache/gravitino/issues/9048))
### Does this PR introduce _any_ user-facing change?
N/A
### How was this patch tested?
org.apache.gravitino.iceberg.service.rest.TestScanPlanCache
---
.../lakehouse/iceberg/IcebergConstants.java | 4 +
docs/iceberg-rest-service.md | 15 +
.../gravitino/iceberg/common/IcebergConfig.java | 24 ++
.../iceberg/service/CatalogWrapperForREST.java | 52 +++-
.../iceberg/service/cache/LocalScanPlanCache.java | 137 +++++++++
.../iceberg/service/cache/ScanPlanCache.java | 75 +++++
.../iceberg/service/cache/ScanPlanCacheKey.java | 168 ++++++++++++
.../iceberg/service/rest/TestScanPlanCache.java | 305 +++++++++++++++++++++
8 files changed, 774 insertions(+), 6 deletions(-)
diff --git
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index 183853d005..dbc3d858c8 100644
---
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -99,4 +99,8 @@ public class IcebergConstants {
public static final String TABLE_METADATA_CACHE_CAPACITY =
"table-metadata-cache-capacity";
public static final String TABLE_METADATA_CACHE_EXPIRE_MINUTES =
"table-metadata-cache-expire-minutes";
+
+ public static final String SCAN_PLAN_CACHE_IMPL = "scan-plan-cache-impl";
+ public static final String SCAN_PLAN_CACHE_CAPACITY =
"scan-plan-cache-capacity";
+ public static final String SCAN_PLAN_CACHE_EXPIRE_MINUTES =
"scan-plan-cache-expire-minutes";
}
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index 0fbf4677c6..816c48cd5c 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -31,6 +31,7 @@ There are some key difference between Gravitino Iceberg REST
server and Gravitin
- Supports access control (when running as an auxiliary service).
- Provides a pluggable metrics store interface to store and delete Iceberg
metrics.
- Supports table metadata cache.
+- Supports scan plan cache.
## Server management
@@ -470,6 +471,20 @@ Gravitino features a pluggable cache system for updating
or retrieving table met
Gravitino provides the build-in
`org.apache.gravitino.iceberg.common.cache.LocalTableMetadataCache` to store
the cached data in the memory. You could also implement your custom table
metadata cache by implementing the
`org.apache.gravitino.iceberg.common.cache.TableMetadataCache` interface.
+### Iceberg scan plan cache configuration
+
+Gravitino caches scan plan results to speed up repeated queries with identical
parameters. The cache uses snapshot ID as part of the cache key, so queries
against different snapshots will not use stale cached data.
+
+| Configuration item | Description
| Default value | Required | Since
Version |
+|------------------------------------------------------------|----------------------------------------------------------|---------------|----------|---------------|
+| `gravitino.iceberg-rest.scan-plan-cache-impl` | The
implementation of the scan plan cache. | (none) | No
| 1.2.0 |
+| `gravitino.iceberg-rest.scan-plan-cache-capacity` | The capacity of
the scan plan cache. | 200 | No | 1.2.0
|
+| `gravitino.iceberg-rest.scan-plan-cache-expire-minutes` | The expiration
time (in minutes) of the scan plan cache. | 60 | No | 1.2.0
|
+
+The scan plan cache uses snapshot ID as part of the cache key, ensuring
automatic invalidation when table data changes. This can provide significant
speedup for repeated queries like dashboard refreshes or BI tool queries.
+
+Gravitino provides the built-in
`org.apache.gravitino.iceberg.service.cache.LocalScanPlanCache` to store the
cached data in memory. You can also implement your custom scan plan cache by
implementing the `org.apache.gravitino.iceberg.service.cache.ScanPlanCache`
interface.
+
### Misc configurations
| Configuration item | Description
| Default value | Required | Since Version |
diff --git
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
index eb6e6b2a07..f4b83b68e1 100644
---
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
+++
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergConfig.java
@@ -283,6 +283,30 @@ public class IcebergConfig extends Config implements
OverwriteDefaultConfig {
.intConf()
.createWithDefault(60);
+ public static final ConfigEntry<String> SCAN_PLAN_CACHE_IMPL =
+ new ConfigBuilder(IcebergConstants.SCAN_PLAN_CACHE_IMPL)
+ .doc("The implementation of the scan plan cache")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .stringConf()
+ .create();
+
+ public static final ConfigEntry<Integer> SCAN_PLAN_CACHE_CAPACITY =
+ new ConfigBuilder(IcebergConstants.SCAN_PLAN_CACHE_CAPACITY)
+ .doc("Maximum number of scan plan results to cache.")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .intConf()
+ .checkValue(value -> value > 0,
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+ .createWithDefault(200);
+
+ public static final ConfigEntry<Integer> SCAN_PLAN_CACHE_EXPIRE_MINUTES =
+ new ConfigBuilder(IcebergConstants.SCAN_PLAN_CACHE_EXPIRE_MINUTES)
+ .doc(
+ "Time in minutes after which cached scan plans expire if not
accessed. Cached entries are automatically removed after this period of
inactivity.")
+ .version(ConfigConstants.VERSION_1_2_0)
+ .intConf()
+ .checkValue(value -> value > 0,
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
+ .createWithDefault(60);
+
public String getJdbcDriver() {
return get(JDBC_DRIVER);
}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
index 1d93f1a424..b77b16e855 100644
---
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
@@ -28,7 +28,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
@@ -40,7 +42,10 @@ import
org.apache.gravitino.credential.CredentialPropertyUtils;
import org.apache.gravitino.credential.PathBasedCredentialContext;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
+import org.apache.gravitino.iceberg.service.cache.ScanPlanCache;
+import org.apache.gravitino.iceberg.service.cache.ScanPlanCacheKey;
import org.apache.gravitino.storage.GCSProperties;
+import org.apache.gravitino.utils.ClassUtils;
import org.apache.gravitino.utils.MapUtils;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.iceberg.DeleteFile;
@@ -72,6 +77,8 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
private final Map<String, String> catalogConfigToClients;
+ private final ScanPlanCache scanPlanCache;
+
private static final Set<String> catalogPropertiesToClientKeys =
ImmutableSet.of(
IcebergConstants.IO_IMPL,
@@ -98,6 +105,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
Map<String, String> catalogProperties =
checkForCompatibility(config.getAllConfig(), deprecatedProperties);
this.catalogCredentialManager = new CatalogCredentialManager(catalogName,
catalogProperties);
+ this.scanPlanCache = loadScanPlanCache(config);
}
public LoadTableResponse createTable(
@@ -161,6 +169,9 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
if (catalogCredentialManager != null) {
catalogCredentialManager.close();
}
+ if (scanPlanCache != null) {
+ scanPlanCache.close();
+ }
} finally {
// Call super.close() to release parent class resources including:
// 1. Close underlying catalog (JdbcCatalog, WrappedHiveCatalog, etc.)
@@ -261,14 +272,19 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
try {
Table table = catalog.loadTable(tableIdentifier);
- CloseableIterable<FileScanTask> fileScanTasks =
- createFilePlanScanTasks(table, tableIdentifier, scanRequest);
+ Optional<PlanTableScanResponse> cachedResponse =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, table,
scanRequest));
+ if (cachedResponse.isPresent()) {
+ LOG.info("Using cached scan plan for table: {}", tableIdentifier);
+ return cachedResponse.get();
+ }
List<String> planTasks = new ArrayList<>();
Map<Integer, PartitionSpec> specsById = new HashMap<>();
- List<org.apache.iceberg.DeleteFile> deleteFiles = new ArrayList<>();
+ List<DeleteFile> deleteFiles = new ArrayList<>();
- try (fileScanTasks) {
+ try (CloseableIterable<FileScanTask> fileScanTasks =
+ createFilePlanScanTasks(table, tableIdentifier, scanRequest)) {
for (FileScanTask fileScanTask : fileScanTasks) {
try {
String taskString = ScanTaskParser.toJson(fileScanTask);
@@ -296,7 +312,7 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
}
List<DeleteFile> uniqueDeleteFiles =
-
deleteFiles.stream().distinct().collect(java.util.stream.Collectors.toList());
+ deleteFiles.stream().distinct().collect(Collectors.toList());
if (planTasks.isEmpty()) {
LOG.info(
@@ -318,7 +334,11 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
tableIdentifier);
}
- return responseBuilder.build();
+ PlanTableScanResponse response = responseBuilder.build();
+
+ // Cache the scan plan response
+ scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, table,
scanRequest), response);
+ return response;
} catch (IllegalArgumentException e) {
LOG.error("Invalid scan request for table {}: {}", tableIdentifier,
e.getMessage());
@@ -427,6 +447,26 @@ public class CatalogWrapperForREST extends
IcebergCatalogWrapper {
return scan;
}
+ private ScanPlanCache loadScanPlanCache(IcebergConfig config) {
+ String impl = config.get(IcebergConfig.SCAN_PLAN_CACHE_IMPL);
+ if (StringUtils.isBlank(impl)) {
+ return ScanPlanCache.DUMMY;
+ }
+
+ ScanPlanCache cache =
+ ClassUtils.loadAndGetInstance(impl,
Thread.currentThread().getContextClassLoader());
+ int capacity = config.get(IcebergConfig.SCAN_PLAN_CACHE_CAPACITY);
+ int expireMinutes =
config.get(IcebergConfig.SCAN_PLAN_CACHE_EXPIRE_MINUTES);
+ cache.initialize(capacity, expireMinutes);
+ LOG.info(
+ "Load scan plan cache for catalog: {}, impl: {}, capacity: {}, expire
minutes: {}",
+ catalog.name(),
+ impl,
+ capacity,
+ expireMinutes);
+ return cache;
+ }
+
@VisibleForTesting
static Map<String, String> checkForCompatibility(
Map<String, String> properties, Map<String, String>
deprecatedProperties) {
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/LocalScanPlanCache.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/LocalScanPlanCache.java
new file mode 100644
index 0000000000..19ec663f45
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/LocalScanPlanCache.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.cache;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Local in-memory implementation of {@link ScanPlanCache} using Caffeine
cache.
+ *
+ * <p>This cache is thread-safe and uses a LRU eviction policy.
+ *
+ * <p>Usage example:
+ *
+ * <pre>{@code
+ * ScanPlanCache cache = new LocalScanPlanCache();
+ * cache.initialize(100, 60);
+ * Optional<PlanTableScanResponse> response = cache.get(key);
+ * response.ifPresent(r -> processResponse(r));
+ * }</pre>
+ */
+public class LocalScanPlanCache implements ScanPlanCache {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalScanPlanCache.class);
+
+ private Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache;
+ /**
+ * Initializes the scan plan cache with specified configuration.
+ *
+ * @param capacity the maximum number of scan plans to cache
+ * @param expireMinutes the number of minutes after which cached entries
expire
+ */
+ @Override
+ public void initialize(int capacity, int expireMinutes) {
+ if (capacity <= 0) {
+ throw new IllegalArgumentException("Cache capacity must be positive,
got: " + capacity);
+ }
+ if (expireMinutes <= 0) {
+ throw new IllegalArgumentException(
+ "Cache expiration time must be positive, got: " + expireMinutes);
+ }
+ LOG.info(
+ "Initializing LocalScanPlanCache with capacity: {}, expireAfterAccess:
{} minutes",
+ capacity,
+ expireMinutes);
+
+ this.scanPlanCache =
+ Caffeine.newBuilder()
+ .maximumSize(capacity)
+ .expireAfterAccess(expireMinutes, TimeUnit.MINUTES)
+ .executor(Runnable::run)
+ .build();
+
+ LOG.info("LocalScanPlanCache initialized successfully");
+ }
+
+ /**
+ * Retrieves a cached scan plan for the given key.
+ *
+ * @param key the cache key containing table identifier and snapshot
information
+ * @return an Optional containing the cached PlanTableScanResponse if
present, or empty otherwise
+ */
+ @Override
+ public Optional<PlanTableScanResponse> get(ScanPlanCacheKey key) {
+ if (key == null) {
+ throw new IllegalArgumentException("Cache key cannot be null");
+ }
+ PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key);
+ if (cachedResponse != null) {
+ LOG.debug(
+ "Cache HIT for table: {}, snapshot: {}", key.getTableIdentifier(),
key.getSnapshotId());
+ } else {
+ LOG.debug(
+ "Cache MISS for table: {}, snapshot: {}", key.getTableIdentifier(),
key.getSnapshotId());
+ }
+
+ return Optional.ofNullable(cachedResponse);
+ }
+
+ /**
+ * Stores a scan plan in the cache with the given key.
+ *
+ * @param key the cache key containing table identifier and snapshot
information
+ * @param scanResponse the scan plan response to cache
+ */
+ @Override
+ public void put(ScanPlanCacheKey key, PlanTableScanResponse scanResponse) {
+ if (key == null) {
+ throw new IllegalArgumentException("Cache key cannot be null");
+ }
+ if (scanResponse == null) {
+ throw new IllegalArgumentException("Scan response cannot be null");
+ }
+ scanPlanCache.put(key, scanResponse);
+ LOG.debug(
+ "Update scan plan for table: {}, snapshot: {}",
+ key.getTableIdentifier(),
+ key.getSnapshotId());
+ }
+
+ /**
+ * Closes the cache and releases all cached resources. This method
invalidates all cached entries
+ * and performs cleanup operations.
+ */
+ @Override
+ public void close() {
+ LOG.info("Closing LocalScanPlanCache");
+
+ if (scanPlanCache != null) {
+ scanPlanCache.invalidateAll();
+ scanPlanCache.cleanUp();
+ }
+ LOG.info("LocalScanPlanCache closed successfully");
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/ScanPlanCache.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/ScanPlanCache.java
new file mode 100644
index 0000000000..bc31865ba8
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/ScanPlanCache.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.cache;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+
+/** Interface for caching scan plans. */
+public interface ScanPlanCache extends Closeable {
+
+ /**
+ * A dummy implementation of {@link ScanPlanCache} that performs no caching.
All operations are
+ * no-ops, and {@link #get(ScanPlanCacheKey)} always returns an empty
Optional. This
+ * implementation can be used when scan plan caching is disabled or not
needed.
+ */
+ ScanPlanCache DUMMY =
+ new ScanPlanCache() {
+ @Override
+ public void initialize(int capacity, int expireMinutes) {}
+
+ @Override
+ public Optional<PlanTableScanResponse> get(ScanPlanCacheKey key) {
+ return Optional.empty();
+ }
+
+ @Override
+ public void put(ScanPlanCacheKey key, PlanTableScanResponse
scanResponse) {}
+
+ @Override
+ public void close() throws IOException {}
+ };
+
+ /**
+ * Initializes the scan plan cache with specified configuration.
+ *
+ * @param capacity the maximum number of scan plans to cache
+ * @param expireMinutes the number of minutes after which cached entries
expire
+ */
+ void initialize(int capacity, int expireMinutes);
+
+ /**
+ * Retrieves a cached scan plan response for the given key.
+ *
+ * @param key the cache key containing table identifier and scan parameters
+ * @return an Optional containing the cached scan plan response, or empty if
not found
+ */
+ Optional<PlanTableScanResponse> get(ScanPlanCacheKey key);
+
+ /**
+ * Stores a scan plan response in the cache.
+ *
+ * @param key the cache key containing table identifier and scan parameters
+ * @param scanResponse the scan plan response to cache
+ */
+ void put(ScanPlanCacheKey key, PlanTableScanResponse scanResponse);
+}
diff --git
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/ScanPlanCacheKey.java
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/ScanPlanCacheKey.java
new file mode 100644
index 0000000000..8823d1aaad
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/ScanPlanCacheKey.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.cache;
+
+import com.google.common.base.Objects;
+import java.util.stream.Collectors;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+
+/** Cache key for Iceberg table scan plans. */
+public class ScanPlanCacheKey {
+ private final TableIdentifier tableIdentifier;
+ private final Long snapshotId;
+ private final Long startSnapshotId;
+ private final Long endSnapshotId;
+ private final Expression filter; // Store the filter object instead of string
+ private final String selectStr;
+ private final String statsFieldsStr;
+ private final boolean caseSensitive;
+ private final boolean useSnapshotSchema;
+
+ private ScanPlanCacheKey(
+ TableIdentifier tableIdentifier,
+ Long snapshotId,
+ Long startSnapshotId,
+ Long endSnapshotId,
+ Expression filter,
+ String select,
+ String statsFields,
+ boolean caseSensitive,
+ boolean useSnapshotSchema) {
+ this.tableIdentifier = tableIdentifier;
+ this.snapshotId = snapshotId;
+ this.startSnapshotId = startSnapshotId;
+ this.endSnapshotId = endSnapshotId;
+ this.filter = filter;
+ this.selectStr = select;
+ this.statsFieldsStr = statsFields;
+ this.caseSensitive = caseSensitive;
+ this.useSnapshotSchema = useSnapshotSchema;
+ }
+
+ /**
+ * Creates a cache key from table identifier, table, and scan request.
+ *
+ * @param tableIdentifier the table identifier
+ * @param table the Iceberg table
+ * @param scanRequest the scan request containing filters and projections
+ * @return a new cache key
+ */
+ public static ScanPlanCacheKey create(
+ TableIdentifier tableIdentifier, Table table, PlanTableScanRequest
scanRequest) {
+
+ // Use current snapshot if not specified
+ Long snapshotId = scanRequest.snapshotId();
+ if (snapshotId == null && table.currentSnapshot() != null) {
+ snapshotId = table.currentSnapshot().snapshotId();
+ }
+
+ // Include startSnapshotId and endSnapshotId in the key
+ Long startSnapshotId = scanRequest.startSnapshotId();
+ Long endSnapshotId = scanRequest.endSnapshotId();
+
+ // Store the filter expression object directly.
+ Expression filter = scanRequest.filter();
+
+ // Sort select and statsFields to make key order-independent
+ String selectStr = "";
+ if (scanRequest.select() != null && !scanRequest.select().isEmpty()) {
+ selectStr =
scanRequest.select().stream().sorted().collect(Collectors.joining(","));
+ }
+
+ String statsFieldsStr = "";
+ if (scanRequest.statsFields() != null &&
!scanRequest.statsFields().isEmpty()) {
+ statsFieldsStr =
scanRequest.statsFields().stream().sorted().collect(Collectors.joining(","));
+ }
+
+ return new ScanPlanCacheKey(
+ tableIdentifier,
+ snapshotId,
+ startSnapshotId,
+ endSnapshotId,
+ filter,
+ selectStr,
+ statsFieldsStr,
+ scanRequest.caseSensitive(),
+ scanRequest.useSnapshotSchema());
+ }
+
+ public TableIdentifier getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ public Long getSnapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof ScanPlanCacheKey)) {
+ return false;
+ }
+ ScanPlanCacheKey that = (ScanPlanCacheKey) o;
+ return caseSensitive == that.caseSensitive
+ && useSnapshotSchema == that.useSnapshotSchema
+ && Objects.equal(tableIdentifier, that.tableIdentifier)
+ && Objects.equal(snapshotId, that.snapshotId)
+ && Objects.equal(startSnapshotId, that.startSnapshotId)
+ && Objects.equal(endSnapshotId, that.endSnapshotId)
+ && Objects.equal(
+ filter != null ? filter.toString() : null,
+ that.filter != null ? that.filter.toString() : null)
+ && Objects.equal(selectStr, that.selectStr)
+ && Objects.equal(statsFieldsStr, that.statsFieldsStr);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(
+ tableIdentifier,
+ snapshotId,
+ startSnapshotId,
+ endSnapshotId,
+ filter != null ? filter.toString() : null,
+ selectStr,
+ statsFieldsStr,
+ caseSensitive,
+ useSnapshotSchema);
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "ScanPlanCacheKey{table=%s, snapshotId=%s, startSnapshotId=%s,
endSnapshotId=%s, "
+ + "filter=%s, select=%s, statsFields=%s, caseSensitive=%s,
useSnapshotSchema=%s}",
+ tableIdentifier,
+ snapshotId,
+ startSnapshotId,
+ endSnapshotId,
+ filter,
+ selectStr.isEmpty() ? "null" : selectStr,
+ statsFieldsStr.isEmpty() ? "null" : statsFieldsStr,
+ caseSensitive,
+ useSnapshotSchema);
+ }
+}
diff --git
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java
new file mode 100644
index 0000000000..cd4912c6cb
--- /dev/null
+++
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.iceberg.service.rest;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+import org.apache.gravitino.iceberg.service.cache.LocalScanPlanCache;
+import org.apache.gravitino.iceberg.service.cache.ScanPlanCache;
+import org.apache.gravitino.iceberg.service.cache.ScanPlanCacheKey;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.responses.PlanTableScanResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestScanPlanCache {
+
+ private ScanPlanCache scanPlanCache;
+ private Table mockTable;
+ private Snapshot mockSnapshot;
+ private TableIdentifier tableIdentifier;
+
+ @BeforeEach
+ public void setUp() {
+ scanPlanCache = new LocalScanPlanCache();
+ scanPlanCache.initialize(10, 60);
+ tableIdentifier = TableIdentifier.of(Namespace.of("test_db"),
"test_table");
+
+ mockSnapshot = mock(Snapshot.class);
+ when(mockSnapshot.snapshotId()).thenReturn(1L);
+
+ mockTable = mock(Table.class);
+ when(mockTable.name()).thenReturn("test_table");
+ when(mockTable.currentSnapshot()).thenReturn(mockSnapshot);
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (scanPlanCache != null) {
+ scanPlanCache.close();
+ }
+ }
+
+ @Test
+ public void testCacheHit() {
+ PlanTableScanRequest scanRequest =
+ new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+ PlanTableScanResponse response =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task1"))
+ .build();
+
+ Optional<PlanTableScanResponse> cached1 =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
scanRequest));
+ Assertions.assertFalse(cached1.isPresent(), "First call should be cache
miss");
+
+ scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable,
scanRequest), response);
+
+ Optional<PlanTableScanResponse> cached2 =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
scanRequest));
+ Assertions.assertTrue(cached2.isPresent(), "Second call should be cache
hit");
+ Assertions.assertEquals(response, cached2.get());
+ Assertions.assertEquals(PlanStatus.COMPLETED, cached2.get().planStatus());
+ }
+
+ @Test
+ public void testCacheMiss() {
+ PlanTableScanRequest request1 = new
PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+ PlanTableScanRequest request2 =
+ new PlanTableScanRequest.Builder()
+ .withSelect(Arrays.asList("id", "name"))
+ .withSnapshotId(1L)
+ .build();
+
+ PlanTableScanResponse response1 =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task1"))
+ .build();
+
+ PlanTableScanResponse response2 =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task2"))
+ .build();
+
+ scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request1), response1);
+ scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request2), response2);
+
+ Optional<PlanTableScanResponse> cached1 =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request1));
+ Assertions.assertTrue(cached1.isPresent());
+ Assertions.assertEquals(response1, cached1.get());
+
+ Optional<PlanTableScanResponse> cached2 =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request2));
+ Assertions.assertTrue(cached2.isPresent());
+ Assertions.assertEquals(response2, cached2.get());
+ Assertions.assertNotEquals(
+ cached1.get(), cached2.get(), "Different requests should have
different cache entries");
+ }
+
+ @Test
+ public void testCacheKeyWithDifferentSelectOrder() {
+ PlanTableScanRequest request1 =
+ new PlanTableScanRequest.Builder()
+ .withSelect(Arrays.asList("id", "name"))
+ .withSnapshotId(1L)
+ .build();
+ PlanTableScanRequest request2 =
+ new PlanTableScanRequest.Builder()
+ .withSelect(Arrays.asList("name", "id"))
+ .withSnapshotId(1L)
+ .build();
+
+ PlanTableScanResponse response =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task1"))
+ .build();
+
+ scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request1), response);
+
+ Optional<PlanTableScanResponse> cached2 =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request2));
+ Assertions.assertTrue(
+ cached2.isPresent(), "Select fields with different order should use
same cache key");
+ Assertions.assertEquals(response, cached2.get());
+ }
+
+ @Test
+ public void testCacheKeyWithDifferentTableIdentifier() {
+ TableIdentifier table1 = TableIdentifier.of(Namespace.of("db1"), "table1");
+ TableIdentifier table2 = TableIdentifier.of(Namespace.of("db2"), "table2");
+
+ PlanTableScanRequest scanRequest =
+ new PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+
+ PlanTableScanResponse response1 =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task1"))
+ .build();
+
+ PlanTableScanResponse response2 =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task2"))
+ .build();
+
+ scanPlanCache.put(ScanPlanCacheKey.create(table1, mockTable, scanRequest),
response1);
+ scanPlanCache.put(ScanPlanCacheKey.create(table2, mockTable, scanRequest),
response2);
+
+ Optional<PlanTableScanResponse> cached1 =
+ scanPlanCache.get(ScanPlanCacheKey.create(table1, mockTable,
scanRequest));
+ Assertions.assertTrue(cached1.isPresent());
+ Assertions.assertEquals(response1, cached1.get());
+
+ Optional<PlanTableScanResponse> cached2 =
+ scanPlanCache.get(ScanPlanCacheKey.create(table2, mockTable,
scanRequest));
+ Assertions.assertTrue(cached2.isPresent());
+ Assertions.assertEquals(response2, cached2.get());
+ Assertions.assertNotEquals(cached1.get(), cached2.get());
+ }
+
+ @Test
+ public void testCacheKeyWithDifferentSnapshotId() {
+ PlanTableScanRequest request1 = new
PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+ PlanTableScanRequest request2 = new
PlanTableScanRequest.Builder().withSnapshotId(2L).build();
+
+ PlanTableScanResponse response1 =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task1"))
+ .build();
+
+ // Cache with snapshot ID 1
+ scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request1), response1);
+
+ // Query with snapshot ID 2 should result in cache miss
+ Optional<PlanTableScanResponse> cached =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request2));
+ Assertions.assertFalse(cached.isPresent(), "Different snapshot should
result in cache miss");
+
+ // Query with snapshot ID 1 should result in cache hit
+ Optional<PlanTableScanResponse> cachedHit =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request1));
+ Assertions.assertTrue(cachedHit.isPresent(), "Same snapshot should result
in cache hit");
+ Assertions.assertEquals(response1, cachedHit.get());
+ }
+
+ @Test
+ public void testCacheKeyWithNullSnapshot() {
+ // Test edge case: table.currentSnapshot() returns null (empty table with
no snapshots)
+ // This test verifies that:
+ // 1. Cache works correctly with null snapshots
+ // 2. Different requests produce different cache keys even with null
snapshot
+ // Test that different requests with null snapshot produce different cache
keys
+ // if they differ in other parameters
+ when(mockTable.currentSnapshot()).thenReturn(null);
+
+ PlanTableScanRequest request1 = new
PlanTableScanRequest.Builder().withSnapshotId(1L).build();
+ PlanTableScanRequest request2 =
+ new PlanTableScanRequest.Builder()
+ .withSnapshotId(1L)
+ .withSelect(Arrays.asList("id", "name"))
+ .build();
+
+ PlanTableScanResponse response1 =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task1"))
+ .build();
+
+ PlanTableScanResponse response2 =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanTasks(Collections.singletonList("task2"))
+ .build();
+
+ scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request1), response1);
+ scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request2), response2);
+
+ Optional<PlanTableScanResponse> cached1 =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request1));
+ Assertions.assertTrue(cached1.isPresent());
+ Assertions.assertEquals(response1, cached1.get());
+
+ Optional<PlanTableScanResponse> cached2 =
+ scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable,
request2));
+ Assertions.assertTrue(cached2.isPresent());
+ Assertions.assertEquals(response2, cached2.get());
+
+ Assertions.assertNotEquals(
+ cached1.get(), cached2.get(), "Different requests should have
different cache entries");
+ }
+
+ @Test
+ public void testExpressionIsEquivalentTo() {
+
+ // Test case 1: Identical simple expressions
+ Expression expr1 = Expressions.equal("status", "active");
+ Expression expr2 = Expressions.equal("status", "active");
+
+ // Reference equality - should be false
+ Assertions.assertNotSame(expr1, expr2, "Expression objects should be
different instances");
+
+ // Standard isEquivalentTo()
+ Assertions.assertFalse(
+ expr1.isEquivalentTo(expr2),
+ "Standard isEquivalentTo() should return false for simple
expression.");
+
+ PlanTableScanRequest request1 =
+ new
PlanTableScanRequest.Builder().withFilter(expr1).withSnapshotId(1L).build();
+ PlanTableScanRequest request2 =
+ new
PlanTableScanRequest.Builder().withFilter(expr2).withSnapshotId(1L).build();
+
+ ScanPlanCacheKey key1 = ScanPlanCacheKey.create(tableIdentifier,
mockTable, request1);
+ ScanPlanCacheKey key2 = ScanPlanCacheKey.create(tableIdentifier,
mockTable, request2);
+
+ // With string-based comparison, identical expressions should produce
equal cache keys
+ // This works because expr1.toString().equals(expr2.toString()) should
return true
+ Assertions.assertEquals(
+ key1, key2, "Identical expressions should produce equal cache keys
with string comparison");
+
+ Assertions.assertEquals(
+ key1.hashCode(), key2.hashCode(), "Identical expressions should have
the same hash code");
+
+ // Verify that the string representations are indeed equal
+ String filterStr1 = expr1.toString();
+ String filterStr2 = expr2.toString();
+ Assertions.assertEquals(
+ filterStr1, filterStr2, "Expression string representations should be
equal");
+ }
+}