Copilot commented on code in PR #8980: URL: https://github.com/apache/gravitino/pull/8980#discussion_r2584932589
########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.apache.gravitino.iceberg.service.cache.LocalScanPlanCache; +import org.apache.gravitino.iceberg.service.cache.ScanPlanCache; +import org.apache.gravitino.iceberg.service.cache.ScanPlanCacheKey; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestScanPlanCache { Review Comment: The PR description references test file `org.apache.gravitino.iceberg.service.rest.TestPlanTableScan`, but this file doesn't exist in the codebase. The actual test file is `TestScanPlanCache.java`. Please update the PR description to reference the correct test file. ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/ScanPlanCacheKey.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.cache; + +import com.google.common.base.Objects; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; + +/** Cache key for Iceberg table scan plans. */ +public class ScanPlanCacheKey { + private final TableIdentifier tableIdentifier; + private final Long snapshotId; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final String filterStr; + private final String selectStr; + private final String statsFieldsStr; + private final boolean caseSensitive; + private final boolean useSnapshotSchema; + + private ScanPlanCacheKey( + TableIdentifier tableIdentifier, + Long snapshotId, + Long startSnapshotId, + Long endSnapshotId, + String filter, + String select, + String statsFields, + boolean caseSensitive, + boolean useSnapshotSchema) { + this.tableIdentifier = tableIdentifier; + this.snapshotId = snapshotId; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.filterStr = filter; + this.selectStr = select; + this.statsFieldsStr = statsFields; + this.caseSensitive = caseSensitive; + this.useSnapshotSchema = useSnapshotSchema; + } + + /** + * Creates a cache key from table identifier, table, and scan request. + * + * @param tableIdentifier the table identifier + * @param table the Iceberg table + * @param scanRequest the scan request containing filters and projections + * @return a new cache key + */ + public static ScanPlanCacheKey create( + TableIdentifier tableIdentifier, Table table, PlanTableScanRequest scanRequest) { + + // Use current snapshot if not specified + Long snapshotId = scanRequest.snapshotId(); + if ((snapshotId == null || snapshotId == 0L) && table.currentSnapshot() != null) { + snapshotId = table.currentSnapshot().snapshotId(); + } + + // Include startSnapshotId and endSnapshotId in the key + Long startSnapshotId = scanRequest.startSnapshotId(); + Long endSnapshotId = scanRequest.endSnapshotId(); + + // Note: Using toString() for filter representation. This assumes the filter's + // toString() method produces a consistent, deterministic string representation. + // If the filter implementation changes, cache effectiveness may be impacted. + String filterStr = scanRequest.filter() != null ? scanRequest.filter().toString() : ""; Review Comment: The comment acknowledges a potential issue: the cache effectiveness depends on filter.toString() producing consistent, deterministic output. However, this is not guaranteed by the Iceberg Expression API. Consider using a more robust approach like converting the filter to a canonical form or using Iceberg's expression serialization utilities to ensure consistent cache keys for semantically equivalent filters. As it stands, two equivalent filters might produce different cache keys if their toString() representations differ. ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.apache.gravitino.iceberg.service.cache.LocalScanPlanCache; +import org.apache.gravitino.iceberg.service.cache.ScanPlanCache; +import org.apache.gravitino.iceberg.service.cache.ScanPlanCacheKey; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestScanPlanCache { Review Comment: While unit tests cover the cache implementation itself, there are no integration tests that verify the cache is actually used in `CatalogWrapperForREST.planTableScan()`. Consider adding integration tests that: 1. Verify that repeated identical scan requests result in cache hits (can be confirmed via logs or cache metrics) 2. Test that the cache properly invalidates when table snapshots change 3. Verify cache behavior with different combinations of scan request parameters (filters, projections, etc.) These tests should be added to verify the end-to-end caching behavior, not just the cache component in isolation. ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/LocalScanPlanCache.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Local in-memory implementation of {@link ScanPlanCache} using Caffeine cache. + * + * <p>This cache is thread-safe and uses a LRU eviction policy. + * + * <p>Usage example: + * + * <pre>{@code + * ScanPlanCache cache = new LocalScanPlanCache(100, 60); + * Optional<PlanTableScanResponse> response = cache.get(key); + * response.ifPresent(r -> processResponse(r)); + * }</pre> + */ +public class LocalScanPlanCache implements ScanPlanCache { + + private static final Logger LOG = LoggerFactory.getLogger(LocalScanPlanCache.class); + + private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache; + + /** + * Creates a new ScanPlanCache with the specified capacity and expiration time. + * + * @param capacity Maximum number of scan plan results to cache. When the cache reaches this size, + * least-recently-used entries will be evicted. + * @param expireMinutes Time in minutes after which cached scan plans expire if not accessed. + * Cached entries are automatically removed after this period of inactivity. + */ + public LocalScanPlanCache(int capacity, int expireMinutes) { + if (capacity <= 0) { + throw new IllegalArgumentException("Cache capacity must be positive, got: " + capacity); + } + if (expireMinutes <= 0) { + throw new IllegalArgumentException( + "Cache expiration time must be positive, got: " + expireMinutes); + } + LOG.info( + "Initializing LocalScanPlanCache with capacity: {}, expireAfterAccess: {} minutes", + capacity, + expireMinutes); + + this.scanPlanCache = + Caffeine.newBuilder() + .maximumSize(capacity) + .expireAfterAccess(expireMinutes, TimeUnit.MINUTES) + .executor(Runnable::run) Review Comment: [nitpick] Using `.executor(Runnable::run)` configures Caffeine to execute maintenance tasks synchronously, which may impact performance under high load. Consider whether asynchronous execution with a dedicated executor would be more appropriate for production use, or document why synchronous execution is preferred for this cache. ```suggestion .executor(java.util.concurrent.ForkJoinPool.commonPool()) ``` ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java: ########## @@ -98,6 +105,11 @@ public CatalogWrapperForREST(String catalogName, IcebergConfig config) { Map<String, String> catalogProperties = checkForCompatibility(config.getAllConfig(), deprecatedProperties); this.catalogCredentialManager = new CatalogCredentialManager(catalogName, catalogProperties); + // Initialize scan plan cache. + this.scanPlanCache = + new LocalScanPlanCache( + config.get(IcebergConfig.SCAN_PLAN_CACHE_CAPACITY), + config.get(IcebergConfig.SCAN_PLAN_CACHE_EXPIRE_MINUTES)); Review Comment: Consider adding a comment explaining why the scan plan cache is always initialized (non-configurable) whereas other caches like table metadata cache can be disabled. If caching should be optional, add a configuration option to enable/disable the scan plan cache. ```suggestion // The scan plan cache is initialized by default to improve performance of repeated scan planning // operations. Unlike some other caches (e.g., table metadata cache), this was previously not // configurable. To provide flexibility, we now allow disabling the scan plan cache via config. // If caching is disabled, scanPlanCache will be set to null and caching will be bypassed. boolean scanPlanCacheEnabled = config.getOrDefault("scan.plan.cache.enabled", true); if (scanPlanCacheEnabled) { this.scanPlanCache = new LocalScanPlanCache( config.get(IcebergConfig.SCAN_PLAN_CACHE_CAPACITY), config.get(IcebergConfig.SCAN_PLAN_CACHE_EXPIRE_MINUTES)); } else { this.scanPlanCache = null; } ``` ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.apache.gravitino.iceberg.service.cache.LocalScanPlanCache; +import org.apache.gravitino.iceberg.service.cache.ScanPlanCache; +import org.apache.gravitino.iceberg.service.cache.ScanPlanCacheKey; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestScanPlanCache { + + private ScanPlanCache scanPlanCache; + private Table mockTable; + private Snapshot mockSnapshot; + private TableIdentifier tableIdentifier; + + @BeforeEach + public void setUp() { + scanPlanCache = new LocalScanPlanCache(10, 60); + tableIdentifier = TableIdentifier.of(Namespace.of("test_db"), "test_table"); + + mockSnapshot = mock(Snapshot.class); + when(mockSnapshot.snapshotId()).thenReturn(1L); + + mockTable = mock(Table.class); + when(mockTable.name()).thenReturn("test_table"); + when(mockTable.currentSnapshot()).thenReturn(mockSnapshot); + } Review Comment: Missing test coverage for the edge case where `table.currentSnapshot()` returns null (empty table). According to the cache key creation logic in `ScanPlanCacheKey.create()`, when `snapshotId` is null or 0 and `table.currentSnapshot()` is null, the resulting cache key will have a null snapshot ID. Add a test to verify cache behavior with empty tables that have no snapshots. ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/ScanPlanCacheKey.java: ########## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.cache; + +import com.google.common.base.Objects; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; + +/** Cache key for Iceberg table scan plans. */ +public class ScanPlanCacheKey { + private final TableIdentifier tableIdentifier; + private final Long snapshotId; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final String filterStr; + private final String selectStr; + private final String statsFieldsStr; + private final boolean caseSensitive; + private final boolean useSnapshotSchema; + + private ScanPlanCacheKey( + TableIdentifier tableIdentifier, + Long snapshotId, + Long startSnapshotId, + Long endSnapshotId, + String filter, + String select, + String statsFields, + boolean caseSensitive, + boolean useSnapshotSchema) { + this.tableIdentifier = tableIdentifier; + this.snapshotId = snapshotId; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.filterStr = filter; + this.selectStr = select; + this.statsFieldsStr = statsFields; + this.caseSensitive = caseSensitive; + this.useSnapshotSchema = useSnapshotSchema; + } + + /** + * Creates a cache key from table identifier, table, and scan request. + * + * @param tableIdentifier the table identifier + * @param table the Iceberg table + * @param scanRequest the scan request containing filters and projections + * @return a new cache key + */ + public static ScanPlanCacheKey create( + TableIdentifier tableIdentifier, Table table, PlanTableScanRequest scanRequest) { + + // Use current snapshot if not specified + Long snapshotId = scanRequest.snapshotId(); + if ((snapshotId == null || snapshotId == 0L) && table.currentSnapshot() != null) { + snapshotId = table.currentSnapshot().snapshotId(); + } Review Comment: The condition `snapshotId == 0L` is used to check if the snapshot ID is unspecified, but this relies on a magic number. Consider adding a constant like `UNSPECIFIED_SNAPSHOT_ID = 0L` or checking Iceberg's conventions for representing unspecified snapshot IDs to make the intent clearer. ########## iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestScanPlanCache.java: ########## @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.rest; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import org.apache.gravitino.iceberg.service.cache.LocalScanPlanCache; +import org.apache.gravitino.iceberg.service.cache.ScanPlanCache; +import org.apache.gravitino.iceberg.service.cache.ScanPlanCacheKey; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.PlanStatus; +import org.apache.iceberg.rest.requests.PlanTableScanRequest; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class TestScanPlanCache { + + private ScanPlanCache scanPlanCache; + private Table mockTable; + private Snapshot mockSnapshot; + private TableIdentifier tableIdentifier; + + @BeforeEach + public void setUp() { + scanPlanCache = new LocalScanPlanCache(10, 60); + tableIdentifier = TableIdentifier.of(Namespace.of("test_db"), "test_table"); + + mockSnapshot = mock(Snapshot.class); + when(mockSnapshot.snapshotId()).thenReturn(1L); + + mockTable = mock(Table.class); + when(mockTable.name()).thenReturn("test_table"); + when(mockTable.currentSnapshot()).thenReturn(mockSnapshot); + } + + @AfterEach + public void tearDown() throws Exception { + if (scanPlanCache != null) { + scanPlanCache.close(); + } + } + + @Test + public void testCacheHit() { + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(1L).build(); + PlanTableScanResponse response = + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(Collections.singletonList("task1")) + .build(); + + Optional<PlanTableScanResponse> cached1 = + scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable, scanRequest)); + Assertions.assertFalse(cached1.isPresent(), "First call should be cache miss"); + + scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable, scanRequest), response); + + Optional<PlanTableScanResponse> cached2 = + scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable, scanRequest)); + Assertions.assertTrue(cached2.isPresent(), "Second call should be cache hit"); + Assertions.assertEquals(response, cached2.get()); + Assertions.assertEquals(PlanStatus.COMPLETED, cached2.get().planStatus()); + } + + @Test + public void testCacheMiss() { + PlanTableScanRequest request1 = new PlanTableScanRequest.Builder().withSnapshotId(1L).build(); + PlanTableScanRequest request2 = + new PlanTableScanRequest.Builder() + .withSelect(Arrays.asList("id", "name")) + .withSnapshotId(1L) + .build(); + + PlanTableScanResponse response1 = + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(Collections.singletonList("task1")) + .build(); + + PlanTableScanResponse response2 = + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(Collections.singletonList("task2")) + .build(); + + scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable, request1), response1); + scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable, request2), response2); + + Optional<PlanTableScanResponse> cached1 = + scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable, request1)); + Assertions.assertTrue(cached1.isPresent()); + Assertions.assertEquals(response1, cached1.get()); + + Optional<PlanTableScanResponse> cached2 = + scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable, request2)); + Assertions.assertTrue(cached2.isPresent()); + Assertions.assertEquals(response2, cached2.get()); + Assertions.assertNotEquals( + cached1.get(), cached2.get(), "Different requests should have different cache entries"); + } + + @Test + public void testCacheKeyWithDifferentSelectOrder() { + PlanTableScanRequest request1 = + new PlanTableScanRequest.Builder() + .withSelect(Arrays.asList("id", "name")) + .withSnapshotId(1L) + .build(); + PlanTableScanRequest request2 = + new PlanTableScanRequest.Builder() + .withSelect(Arrays.asList("name", "id")) + .withSnapshotId(1L) + .build(); + + PlanTableScanResponse response = + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(Collections.singletonList("task1")) + .build(); + + scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable, request1), response); + + Optional<PlanTableScanResponse> cached2 = + scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable, request2)); + Assertions.assertTrue( + cached2.isPresent(), "Select fields with different order should use same cache key"); + Assertions.assertEquals(response, cached2.get()); + } + + @Test + public void testCacheKeyWithDifferentTableIdentifier() { + TableIdentifier table1 = TableIdentifier.of(Namespace.of("db1"), "table1"); + TableIdentifier table2 = TableIdentifier.of(Namespace.of("db2"), "table2"); + + PlanTableScanRequest scanRequest = + new PlanTableScanRequest.Builder().withSnapshotId(1L).build(); + + PlanTableScanResponse response1 = + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(Collections.singletonList("task1")) + .build(); + + PlanTableScanResponse response2 = + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(Collections.singletonList("task2")) + .build(); + + scanPlanCache.put(ScanPlanCacheKey.create(table1, mockTable, scanRequest), response1); + scanPlanCache.put(ScanPlanCacheKey.create(table2, mockTable, scanRequest), response2); + + Optional<PlanTableScanResponse> cached1 = + scanPlanCache.get(ScanPlanCacheKey.create(table1, mockTable, scanRequest)); + Assertions.assertTrue(cached1.isPresent()); + Assertions.assertEquals(response1, cached1.get()); + + Optional<PlanTableScanResponse> cached2 = + scanPlanCache.get(ScanPlanCacheKey.create(table2, mockTable, scanRequest)); + Assertions.assertTrue(cached2.isPresent()); + Assertions.assertEquals(response2, cached2.get()); + Assertions.assertNotEquals(cached1.get(), cached2.get()); + } + + @Test + public void testCacheKeyWithDifferentSnapshotId() { + PlanTableScanRequest request1 = new PlanTableScanRequest.Builder().withSnapshotId(1L).build(); + PlanTableScanRequest request2 = new PlanTableScanRequest.Builder().withSnapshotId(2L).build(); + + PlanTableScanResponse response1 = + PlanTableScanResponse.builder() + .withPlanStatus(PlanStatus.COMPLETED) + .withPlanTasks(Collections.singletonList("task1")) + .build(); + + // Cache with snapshot ID 1 + scanPlanCache.put(ScanPlanCacheKey.create(tableIdentifier, mockTable, request1), response1); + + // Query with snapshot ID 2 should result in cache miss + Optional<PlanTableScanResponse> cached = + scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable, request2)); + Assertions.assertFalse(cached.isPresent(), "Different snapshot should result in cache miss"); + + // Query with snapshot ID 1 should result in cache hit + Optional<PlanTableScanResponse> cachedHit = + scanPlanCache.get(ScanPlanCacheKey.create(tableIdentifier, mockTable, request1)); + Assertions.assertTrue(cachedHit.isPresent(), "Same snapshot should result in cache hit"); + Assertions.assertEquals(response1, cachedHit.get()); + } Review Comment: Test coverage is missing for cache eviction scenarios. Consider adding tests for: 1. Cache capacity limit - verify that when the cache reaches capacity (10 in setUp), adding new entries evicts the least-recently-used entries 2. Cache expiration - verify that entries expire after the configured time (60 minutes in setUp), though this may require test-friendly time manipulation These scenarios are critical for validating the cache behaves correctly under production conditions. ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/LocalScanPlanCache.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Local in-memory implementation of {@link ScanPlanCache} using Caffeine cache. + * + * <p>This cache is thread-safe and uses a LRU eviction policy. + * + * <p>Usage example: + * + * <pre>{@code + * ScanPlanCache cache = new LocalScanPlanCache(100, 60); + * Optional<PlanTableScanResponse> response = cache.get(key); + * response.ifPresent(r -> processResponse(r)); + * }</pre> + */ +public class LocalScanPlanCache implements ScanPlanCache { + + private static final Logger LOG = LoggerFactory.getLogger(LocalScanPlanCache.class); + + private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache; + + /** + * Creates a new ScanPlanCache with the specified capacity and expiration time. + * + * @param capacity Maximum number of scan plan results to cache. When the cache reaches this size, + * least-recently-used entries will be evicted. + * @param expireMinutes Time in minutes after which cached scan plans expire if not accessed. + * Cached entries are automatically removed after this period of inactivity. + */ + public LocalScanPlanCache(int capacity, int expireMinutes) { + if (capacity <= 0) { + throw new IllegalArgumentException("Cache capacity must be positive, got: " + capacity); + } + if (expireMinutes <= 0) { + throw new IllegalArgumentException( + "Cache expiration time must be positive, got: " + expireMinutes); + } + LOG.info( + "Initializing LocalScanPlanCache with capacity: {}, expireAfterAccess: {} minutes", + capacity, + expireMinutes); + + this.scanPlanCache = + Caffeine.newBuilder() + .maximumSize(capacity) + .expireAfterAccess(expireMinutes, TimeUnit.MINUTES) + .executor(Runnable::run) + .build(); + + LOG.info("LocalScanPlanCache initialized successfully"); + } + + /** + * Retrieves a cached scan plan for the given key. + * + * @param key the cache key containing table identifier and snapshot information + * @return an Optional containing the cached PlanTableScanResponse if present, or empty otherwise + */ + @Override + public Optional<PlanTableScanResponse> get(ScanPlanCacheKey key) { + if (key == null) { + throw new IllegalArgumentException("Cache key cannot be null"); + } + PlanTableScanResponse cachedResponse = scanPlanCache.getIfPresent(key); + if (cachedResponse != null) { + LOG.debug( + "Cache HIT for table: {}, snapshot: {}", key.getTableIdentifier(), key.getSnapshotId()); + } else { + LOG.debug( + "Cache MISS for table: {}, snapshot: {}", key.getTableIdentifier(), key.getSnapshotId()); + } + + return Optional.ofNullable(cachedResponse); + } + + /** + * Stores a scan plan in the cache with the given key. + * + * @param key the cache key containing table identifier and snapshot information + * @param scanResponse the scan plan response to cache + */ + @Override + public void put(ScanPlanCacheKey key, PlanTableScanResponse scanResponse) { + if (key == null) { + throw new IllegalArgumentException("Cache key cannot be null"); + } + if (scanResponse == null) { + throw new IllegalArgumentException("Scan response cannot be null"); + } + scanPlanCache.put(key, scanResponse); + LOG.debug( + "Cached scan plan for table: {}, snapshot: {}", + key.getTableIdentifier(), + key.getSnapshotId()); + } + + /** + * Closes the cache and releases all cached resources. This method invalidates all cached entries + * and performs cleanup operations. + * + * @throws IOException if an I/O error occurs during cleanup + */ + @Override + public void close() throws IOException { Review Comment: The @throws IOException annotation is misleading. The current implementation never throws IOException during cleanup - it only performs Caffeine cache operations which don't throw checked exceptions. Consider removing the `throws IOException` from the method signature or document specific scenarios where IOException would be thrown. ```suggestion */ @Override public void close() { ``` ########## iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/cache/LocalScanPlanCache.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.iceberg.service.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.iceberg.rest.responses.PlanTableScanResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Local in-memory implementation of {@link ScanPlanCache} using Caffeine cache. + * + * <p>This cache is thread-safe and uses a LRU eviction policy. + * + * <p>Usage example: + * + * <pre>{@code + * ScanPlanCache cache = new LocalScanPlanCache(100, 60); + * Optional<PlanTableScanResponse> response = cache.get(key); + * response.ifPresent(r -> processResponse(r)); + * }</pre> + */ +public class LocalScanPlanCache implements ScanPlanCache { + + private static final Logger LOG = LoggerFactory.getLogger(LocalScanPlanCache.class); + + private final Cache<ScanPlanCacheKey, PlanTableScanResponse> scanPlanCache; + + /** + * Creates a new ScanPlanCache with the specified capacity and expiration time. + * + * @param capacity Maximum number of scan plan results to cache. When the cache reaches this size, + * least-recently-used entries will be evicted. + * @param expireMinutes Time in minutes after which cached scan plans expire if not accessed. + * Cached entries are automatically removed after this period of inactivity. Review Comment: The documentation mentions that "A typical scan plan might be several KB to MB depending on table size" but doesn't provide guidance on how to size the capacity parameter based on available memory. Consider adding a note about memory considerations, e.g., "For a capacity of 200 and average scan plan size of 1MB, expect approximately 200MB of memory usage for the cache." ```suggestion * Cached entries are automatically removed after this period of inactivity. * <p> * <b>Memory considerations:</b> Memory usage is approximately {@code capacity} multiplied by the average scan plan size. * For example, with a capacity of 200 and an average scan plan size of 1MB, expect approximately 200MB of memory usage for the cache. * A typical scan plan might be several KB to MB depending on table size. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
