github-actions[bot] commented on code in PR #61111: URL: https://github.com/apache/doris/pull/61111#discussion_r2896202477
########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.catalog.Partition; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> + * <li><b>Request coalescing</b>: When the cache expires, only the first request issues + * the MetaService RPC. Concurrent requests for the same partition wait on the inflight + * result via a {@link CompletableFuture}.</li> + * </ul> + * </p> + */ +public class PointQueryVersionCache { + private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class); + + private static volatile PointQueryVersionCache instance; + + /** + * Cache entry holding the version and the timestamp when it was cached. + */ + static class VersionEntry { + final long version; + final long cachedTimeMs; + + VersionEntry(long version, long cachedTimeMs) { + this.version = version; + this.cachedTimeMs = cachedTimeMs; + } + + boolean isExpired(long ttlMs) { + if (ttlMs <= 0) { + return true; + } + return System.currentTimeMillis() - cachedTimeMs > ttlMs; + } + } + + // partitionId -> cached VersionEntry + private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); + Review Comment: **[High] Unbounded cache — potential memory leak** This `ConcurrentHashMap` has no size limit and no eviction mechanism. Expired entries are never removed — they stay in the map forever (only `isExpired()` prevents serving them). Every partition ever queried adds a permanent entry, and dropped partitions are never cleaned up. Every other FE cache in the codebase uses Caffeine or Guava `CacheBuilder` with `maximumSize` and/or `expireAfterWrite`. The `EvictableCacheBuilder` even throws an exception for unbounded caches (`"Unbounded cache is not supported"`). In a cluster with thousands of tables and many partitions, this will grow monotonically without bound. **Suggestion:** Replace with `Caffeine.newBuilder().maximumSize(N).expireAfterWrite(ttl, TimeUnit.MILLISECONDS).build()`. This also eliminates the need for the manual `VersionEntry.isExpired()` check. The request coalescing via `inflightRequests` ConcurrentHashMap is fine since entries are self-cleaning (removed in `finally` block). ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.catalog.Partition; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> Review Comment: **[Low] Javadoc inconsistency:** The comment says `default 500ms` but the actual default value of `pointQueryVersionCacheTtlMs` is `0` (disabled). Please update the Javadoc to match. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.catalog.Partition; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> + * <li><b>Request coalescing</b>: When the cache expires, only the first request issues + * the MetaService RPC. Concurrent requests for the same partition wait on the inflight + * result via a {@link CompletableFuture}.</li> + * </ul> + * </p> + */ +public class PointQueryVersionCache { + private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class); + + private static volatile PointQueryVersionCache instance; + + /** + * Cache entry holding the version and the timestamp when it was cached. + */ + static class VersionEntry { + final long version; + final long cachedTimeMs; + + VersionEntry(long version, long cachedTimeMs) { + this.version = version; + this.cachedTimeMs = cachedTimeMs; + } + + boolean isExpired(long ttlMs) { + if (ttlMs <= 0) { + return true; + } + return System.currentTimeMillis() - cachedTimeMs > ttlMs; + } + } + + // partitionId -> cached VersionEntry + private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); + + // partitionId -> inflight RPC future (for request coalescing) + private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>(); + + @VisibleForTesting + public PointQueryVersionCache() { + } + + public static PointQueryVersionCache getInstance() { + if (instance == null) { + synchronized (PointQueryVersionCache.class) { + if (instance == null) { + instance = new PointQueryVersionCache(); + } + } + } + return instance; + } + + @VisibleForTesting + public static void setInstance(PointQueryVersionCache cache) { + instance = cache; + } + + /** + * Get the visible version for a partition, using TTL-based caching and request coalescing. + * + * @param partition the cloud partition to get version for + * @param ttlMs TTL in milliseconds; 0 or negative disables caching + * @return the visible version + * @throws RpcException if the MetaService RPC fails + */ + public long getVersion(CloudPartition partition, long ttlMs) throws RpcException { + long partitionId = partition.getId(); + + // If cache is disabled, fetch directly + if (ttlMs <= 0) { + return fetchVersionFromMs(partition); + } + + // Check cache first + VersionEntry entry = cache.get(partitionId); + if (entry != null && !entry.isExpired(ttlMs)) { + if (LOG.isDebugEnabled()) { + LOG.debug("point query version cache hit, partition={}, version={}", partitionId, entry.version); + } + return entry.version; + } + + // Cache miss or expired: use request coalescing + return getVersionWithCoalescing(partition, partitionId, ttlMs); + } + + private long getVersionWithCoalescing(CloudPartition partition, long partitionId, long ttlMs) + throws RpcException { + // Try to become the leader request for this partition + CompletableFuture<Long> myFuture = new CompletableFuture<>(); + CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture); + + if (existingFuture != null) { + // Another request is already in flight — wait for its result + if (LOG.isDebugEnabled()) { + LOG.debug("point query version coalescing, waiting for inflight request, partition={}", + partitionId); + } + try { + return existingFuture.get(); + } catch (InterruptedException e) { Review Comment: **[Medium] No timeout on `CompletableFuture.get()` — risk of indefinite blocking** `existingFuture.get()` blocks without timeout. If the leader thread is unexpectedly killed (e.g., OOM, `Thread.stop()`, or other abnormal termination before completing the future), all follower threads will block forever. While `fetchVersionFromMs()` has its own retry/timeout logic inside `VersionHelper`, the overall wait here could be very long (retry_times × timeout_per_attempt), and in pathological cases, infinite. **Suggestion:** Use `existingFuture.get(timeout, TimeUnit.MILLISECONDS)` with a reasonable timeout (e.g., 30s or configurable), and throw `RpcException` on `TimeoutException`. ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/PointQueryVersionCache.java: ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.catalog; + +import org.apache.doris.catalog.Partition; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.VersionHelper; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.service.FrontendOptions; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +/** + * A request-coalescing version cache for point queries in cloud mode. + * + * <p>When {@code enable_snapshot_point_query=true}, every point query needs to fetch + * the partition's visible version from MetaService. Under high concurrency, this causes + * N RPCs for N concurrent point queries on the same partition.</p> + * + * <p>This cache optimizes the version fetching by: + * <ul> + * <li><b>Short TTL caching</b>: Partition versions are cached for a configurable duration + * ({@code point_query_version_cache_ttl_ms}, default 500ms). Within the TTL window, + * concurrent queries reuse the cached version.</li> + * <li><b>Request coalescing</b>: When the cache expires, only the first request issues + * the MetaService RPC. Concurrent requests for the same partition wait on the inflight + * result via a {@link CompletableFuture}.</li> + * </ul> + * </p> + */ +public class PointQueryVersionCache { + private static final Logger LOG = LogManager.getLogger(PointQueryVersionCache.class); + + private static volatile PointQueryVersionCache instance; + + /** + * Cache entry holding the version and the timestamp when it was cached. + */ + static class VersionEntry { + final long version; + final long cachedTimeMs; + + VersionEntry(long version, long cachedTimeMs) { + this.version = version; + this.cachedTimeMs = cachedTimeMs; + } + + boolean isExpired(long ttlMs) { + if (ttlMs <= 0) { + return true; + } + return System.currentTimeMillis() - cachedTimeMs > ttlMs; + } + } + + // partitionId -> cached VersionEntry + private final ConcurrentHashMap<Long, VersionEntry> cache = new ConcurrentHashMap<>(); + + // partitionId -> inflight RPC future (for request coalescing) + private final ConcurrentHashMap<Long, CompletableFuture<Long>> inflightRequests = new ConcurrentHashMap<>(); + + @VisibleForTesting + public PointQueryVersionCache() { + } + + public static PointQueryVersionCache getInstance() { + if (instance == null) { + synchronized (PointQueryVersionCache.class) { + if (instance == null) { + instance = new PointQueryVersionCache(); + } + } + } + return instance; + } + + @VisibleForTesting + public static void setInstance(PointQueryVersionCache cache) { + instance = cache; + } + + /** + * Get the visible version for a partition, using TTL-based caching and request coalescing. + * + * @param partition the cloud partition to get version for + * @param ttlMs TTL in milliseconds; 0 or negative disables caching + * @return the visible version + * @throws RpcException if the MetaService RPC fails + */ + public long getVersion(CloudPartition partition, long ttlMs) throws RpcException { + long partitionId = partition.getId(); + + // If cache is disabled, fetch directly + if (ttlMs <= 0) { + return fetchVersionFromMs(partition); + } + + // Check cache first + VersionEntry entry = cache.get(partitionId); + if (entry != null && !entry.isExpired(ttlMs)) { + if (LOG.isDebugEnabled()) { + LOG.debug("point query version cache hit, partition={}, version={}", partitionId, entry.version); + } + return entry.version; + } + + // Cache miss or expired: use request coalescing + return getVersionWithCoalescing(partition, partitionId, ttlMs); + } + + private long getVersionWithCoalescing(CloudPartition partition, long partitionId, long ttlMs) + throws RpcException { + // Try to become the leader request for this partition + CompletableFuture<Long> myFuture = new CompletableFuture<>(); + CompletableFuture<Long> existingFuture = inflightRequests.putIfAbsent(partitionId, myFuture); + + if (existingFuture != null) { + // Another request is already in flight — wait for its result + if (LOG.isDebugEnabled()) { + LOG.debug("point query version coalescing, waiting for inflight request, partition={}", + partitionId); + } + try { + return existingFuture.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RpcException("get version", "interrupted while waiting for coalesced request"); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RpcException) { + throw (RpcException) cause; + } + throw new RpcException("get version", cause != null ? cause.getMessage() : e.getMessage()); + } + } + + // We are the leader — fetch version from MetaService + try { + long version = fetchVersionFromMs(partition); + // Update cache + cache.put(partitionId, new VersionEntry(version, System.currentTimeMillis())); + // Also update the partition's cached version Review Comment: **[Low] No version monotonicity enforcement in cache** The cache blindly stores whatever version is returned by MetaService, even if it's lower than a previously cached version. While `CloudPartition.setCachedVisibleVersion()` (line 164) enforces monotonicity for the partition object itself (via lock + version comparison), the `PointQueryVersionCache` can serve a stale/lower version if MetaService transiently returns a lower value. This is unlikely in practice but could happen during MetaService failover or network partitions. **Suggestion:** Before `cache.put()`, check `if (existingEntry == null || version >= existingEntry.version)` to ensure the cache never serves a version regression. Alternatively, use `cache.compute()` with a version comparison. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
