This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new f58d62ecd8f branch-4.0: [fix](cloud) fix table and partition
get_version #60064 (#60202)
f58d62ecd8f is described below
commit f58d62ecd8f091543afa91f37cde899951839069
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 26 09:52:11 2026 +0800
branch-4.0: [fix](cloud) fix table and partition get_version #60064 (#60202)
Cherry-picked from #60064
Co-authored-by: meiyi <[email protected]>
---
.../java/org/apache/doris/catalog/OlapTable.java | 36 +++++++++++++---------
.../apache/doris/cloud/catalog/CloudPartition.java | 32 ++++++++++++-------
.../java/org/apache/doris/qe/SessionVariable.java | 2 +-
.../doris/cloud/catalog/CloudPartitionTest.java | 23 ++++++++++----
4 files changed, 61 insertions(+), 32 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 1da1ab08b8d..1e69e9004fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -62,6 +62,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.proto.OlapFile.EncryptionAlgorithmPB;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroup;
import org.apache.doris.rpc.RpcException;
@@ -236,8 +237,8 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
// Cache for table version in cloud mode
// This value is set when get the table version from meta-service, 0 means
version is not cached yet
- private long lastTableVersionCachedTimeMs = 0;
- private long cachedTableVersion = -1;
+ private volatile long lastTableVersionCachedTimeMs = 0;
+ private volatile long cachedTableVersion = -1;
public OlapTable() {
// for persist
@@ -3287,24 +3288,24 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
}
}
- public boolean isCachedTableVersionExpired() {
+ @VisibleForTesting
+ protected boolean isCachedTableVersionExpired() {
// -1 means no cache yet, need to fetch from MS
if (cachedTableVersion == -1) {
return true;
}
ConnectContext ctx = ConnectContext.get();
- if (ctx == null) {
- return true;
- }
- long cacheExpirationMs =
ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
+ long cacheExpirationMs = ctx == null ?
VariableMgr.getDefaultSessionVariable().cloudTableVersionCacheTtlMs
+ : ctx.getSessionVariable().cloudTableVersionCacheTtlMs;
if (cacheExpirationMs <= 0) { // always expired
return true;
}
return System.currentTimeMillis() - lastTableVersionCachedTimeMs >
cacheExpirationMs;
}
- public void setCachedTableVersion(long version) {
- if (version > cachedTableVersion) {
+ @VisibleForTesting
+ protected void setCachedTableVersion(long version) {
+ if (version >= cachedTableVersion) {
cachedTableVersion = version;
lastTableVersionCachedTimeMs = System.currentTimeMillis();
}
@@ -3358,9 +3359,9 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
}
// Get the table versions in batch.
- public static List<Long> getVisibleVersionInBatch(Collection<OlapTable>
tables) {
+ public static List<Long> getVisibleVersionInBatch(List<OlapTable> tables) {
if (tables.isEmpty()) {
- return new ArrayList<>();
+ return Collections.emptyList();
}
if (Config.isNotCloudMode()) {
@@ -3369,14 +3370,21 @@ public class OlapTable extends Table implements
MTMVRelatedTableIf, GsonPostProc
.collect(Collectors.toList());
}
- List<Long> dbIds = new ArrayList<>();
- List<Long> tableIds = new ArrayList<>();
+ List<Long> dbIds = new ArrayList<>(tables.size());
+ List<Long> tableIds = new ArrayList<>(tables.size());
for (OlapTable table : tables) {
dbIds.add(table.getDatabase().getId());
tableIds.add(table.getId());
}
- return getVisibleVersionFromMeta(dbIds, tableIds);
+ List<Long> versions = getVisibleVersionFromMeta(dbIds, tableIds);
+
+ // update cache
+ Preconditions.checkState(tables.size() == versions.size());
+ for (int i = 0; i < tables.size(); i++) {
+ tables.get(i).setCachedTableVersion(versions.get(i));
+ }
+ return versions;
}
private static List<Long> getVisibleVersionFromMeta(List<Long> dbIds,
List<Long> tableIds) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
index 16c4a9f8bce..788b4c6dbfa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -29,16 +29,18 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.qe.ConnectContext;
-import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.qe.VariableMgr;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
+import com.google.common.annotations.VisibleForTesting;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -56,7 +58,7 @@ public class CloudPartition extends Partition {
private long tableId;
// This value is set when get the version from meta-service, 0 means
version is not cached yet
- private long lastVersionCachedTimeMs = 0;
+ private volatile long lastVersionCachedTimeMs = 0;
private ReentrantLock lock = new ReentrantLock(true);
@@ -96,7 +98,7 @@ public class CloudPartition extends Partition {
return;
}
- public void setCachedVisibleVersion(long version, Long
versionUpdateTimeMs) {
+ public void setCachedVisibleVersion(long version, long
versionUpdateTimeMs) {
// we only care the version should increase monotonically and ignore
the readers
LOG.debug("setCachedVisibleVersion use CloudPartition {}, version: {},
old version: {}",
super.getId(), version, super.getVisibleVersion());
@@ -115,8 +117,14 @@ public class CloudPartition extends Partition {
return super.getVisibleVersion();
}
- public boolean isCachedVersionExpired() {
- long cacheExpirationMs =
SessionVariable.cloudPartitionVersionCacheTtlMs;
+ @VisibleForTesting
+ protected boolean isCachedVersionExpired() {
+ if (lastVersionCachedTimeMs == 0) {
+ return true;
+ }
+ ConnectContext ctx = ConnectContext.get();
+ long cacheExpirationMs = ctx == null ?
VariableMgr.getDefaultSessionVariable().cloudPartitionVersionCacheTtlMs
+ : ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs;
if (cacheExpirationMs <= 0) { // always expired
return true;
}
@@ -148,6 +156,7 @@ public class CloudPartition extends Partition {
.setTableId(this.tableId)
.setPartitionId(super.getId())
.setBatchMode(false)
+ .setWaitForPendingTxn(waitForPendingTxns)
.build();
try {
@@ -251,16 +260,18 @@ public class CloudPartition extends Partition {
// Return the visible version in order of the specified partition ids, -1
means version NOT FOUND.
public static List<Long> getSnapshotVisibleVersion(List<CloudPartition>
partitions) throws RpcException {
if (partitions.isEmpty()) {
- return new ArrayList<>();
+ return Collections.emptyList();
}
- if (SessionVariable.cloudPartitionVersionCacheTtlMs <= 0) { // No
cached versions will be used
+ long cloudPartitionVersionCacheTtlMs = ConnectContext.get() == null ? 0
+ :
ConnectContext.get().getSessionVariable().cloudPartitionVersionCacheTtlMs;
+ if (cloudPartitionVersionCacheTtlMs <= 0) { // No cached versions will
be used
return getSnapshotVisibleVersionFromMs(partitions, false);
}
// partitionId -> cachedVersion
- List<Pair<Long, Long>> allVersions = new ArrayList<>();
- List<CloudPartition> expiredPartitions = new ArrayList<>();
+ List<Pair<Long, Long>> allVersions = new
ArrayList<>(partitions.size());
+ List<CloudPartition> expiredPartitions = new
ArrayList<>(partitions.size());
for (CloudPartition partition : partitions) {
long ver = partition.getCachedVisibleVersion();
if (partition.isCachedVersionExpired()) {
@@ -272,8 +283,7 @@ public class CloudPartition extends Partition {
if (LOG.isDebugEnabled()) {
LOG.debug("cloudPartitionVersionCacheTtlMs={}, numPartitions={},
numFilteredPartitions={}",
- SessionVariable.cloudPartitionVersionCacheTtlMs,
- partitions.size(), partitions.size() -
expiredPartitions.size());
+ cloudPartitionVersionCacheTtlMs, partitions.size(),
partitions.size() - expiredPartitions.size());
}
List<Long> versions = null;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 7f875cd5e3d..e9378840101 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -2807,7 +2807,7 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = DISABLE_EMPTY_PARTITION_PRUNE)
public boolean disableEmptyPartitionPrune = false;
@VariableMgr.VarAttr(name = CLOUD_PARTITION_VERSION_CACHE_TTL_MS)
- public static long cloudPartitionVersionCacheTtlMs = 0;
+ public long cloudPartitionVersionCacheTtlMs = 0;
@VariableMgr.VarAttr(name = CLOUD_TABLE_VERSION_CACHE_TTL_MS)
public long cloudTableVersionCacheTtlMs = 0;
// CLOUD_VARIABLES_END
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java
index 3618b581c77..8213cd6c307 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/catalog/CloudPartitionTest.java
@@ -19,6 +19,7 @@ package org.apache.doris.cloud.catalog;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.rpc.VersionHelper;
+import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rpc.RpcException;
@@ -44,13 +45,18 @@ public class CloudPartitionTest {
@Test
public void testIsCachedVersionExpired() {
+ // Create ConnectContext with SessionVariable
+ ConnectContext ctx = new ConnectContext();
+ ctx.setSessionVariable(new SessionVariable());
+ ctx.setThreadLocalInfo();
+
// test isCachedVersionExpired
CloudPartition part = createPartition(1, 2, 3);
- SessionVariable.cloudPartitionVersionCacheTtlMs = 0;
+ ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 0;
Assertions.assertTrue(part.isCachedVersionExpired());
- SessionVariable.cloudPartitionVersionCacheTtlMs = -10086;
+ ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = -10086;
part.setCachedVisibleVersion(2, 10086L); // update version and last
cache time
- SessionVariable.cloudPartitionVersionCacheTtlMs = 10000;
+ ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 10000;
Assertions.assertFalse(part.isCachedVersionExpired()); // not expired
due to long expiration duration
Assertions.assertEquals(2, part.getCachedVisibleVersion());
@@ -58,6 +64,11 @@ public class CloudPartitionTest {
@Test
public void testCachedVersion() throws RpcException {
+ // Create ConnectContext with SessionVariable
+ ConnectContext ctx = new ConnectContext();
+ ctx.setSessionVariable(new SessionVariable());
+ ctx.setThreadLocalInfo();
+
CloudPartition part = createPartition(1, 2, 3);
List<CloudPartition> parts = new ArrayList<>();
for (long i = 0; i < 3; ++i) {
@@ -87,7 +98,7 @@ public class CloudPartitionTest {
};
// CHECKSTYLE ON
- SessionVariable.cloudPartitionVersionCacheTtlMs = -1; // disable cache
+ ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = -1; //
disable cache
{
// test single get version
Assertions.assertEquals(2, part.getVisibleVersion()); //
should not get from cache
@@ -106,7 +117,7 @@ public class CloudPartitionTest {
}
// enable change expiration and make it cached in long duration
- SessionVariable.cloudPartitionVersionCacheTtlMs = 100000;
+ ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 100000;
{
// test single get version
Assertions.assertEquals(2, part.getVisibleVersion()); //
cached version
@@ -125,7 +136,7 @@ public class CloudPartitionTest {
}
// enable change expiration and make it expired
- SessionVariable.cloudPartitionVersionCacheTtlMs = 500;
+ ctx.getSessionVariable().cloudPartitionVersionCacheTtlMs = 500;
try {
Thread.sleep(550);
} catch (InterruptedException e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]