This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c0591389cc706ea46dac72438e48e1ab42bd587e
Author: zhengyu <[email protected]>
AuthorDate: Fri Jun 28 19:05:58 2024 +0800

    [enhancement](cloud) batching get visible version from MetaService (#34615)
    
    Get visible versions one by one from MetaService is costly. Batching
    them into one RPC will not only reduce the workload of RPC service but
    also reduce the lag.
---
 cloud/src/meta-service/meta_service.cpp            |  3 +++
 cloud/src/meta-service/meta_service_txn.cpp        |  6 +++++-
 .../java/org/apache/doris/catalog/OlapTable.java   | 15 ++++++++++++++
 .../java/org/apache/doris/catalog/Partition.java   |  7 +++++++
 .../apache/doris/cloud/catalog/CloudPartition.java | 24 ++++++++++++++++------
 .../cloud/datasource/CloudInternalCatalog.java     |  4 ----
 .../transaction/CloudGlobalTransactionMgr.java     |  4 +++-
 .../doris/common/NereidsSqlCacheManager.java       |  9 ++++----
 .../org/apache/doris/nereids/SqlCacheContext.java  |  1 -
 .../org/apache/doris/qe/cache/CacheAnalyzer.java   | 11 ++++++----
 gensrc/proto/cloud.proto                           |  3 +++
 11 files changed, 66 insertions(+), 21 deletions(-)

diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index b234a48d88a..0bb1fb5f277 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -269,6 +269,7 @@ void 
MetaServiceImpl::get_version(::google::protobuf::RpcController* controller,
                 return;
             }
             response->set_version(version_pb.version());
+            response->add_version_update_time_ms(version_pb.update_time_ms());
         }
         { TEST_SYNC_POINT_CALLBACK("get_version_code", &code); }
         return;
@@ -373,6 +374,7 @@ void 
MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
                 if (!value.has_value()) {
                     // return -1 if the target version is not exists.
                     response->add_versions(-1);
+                    response->add_version_update_time_ms(-1);
                 } else if (is_table_version) {
                     int64_t version = 0;
                     if (!txn->decode_atomic_int(*value, &version)) {
@@ -389,6 +391,7 @@ void 
MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr
                         break;
                     }
                     response->add_versions(version_pb.version());
+                    
response->add_version_update_time_ms(version_pb.update_time_ms());
                 }
             }
         }
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 729696f8e2f..69fecc80788 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -1046,10 +1046,14 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
     }
 
     // Save versions
+    int64_t version_update_time_ms =
+            
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+    response->set_version_update_time_ms(version_update_time_ms);
     for (auto& i : new_versions) {
         std::string ver_val;
         VersionPB version_pb;
         version_pb.set_version(i.second);
+        version_pb.set_update_time_ms(version_update_time_ms);
         if (!version_pb.SerializeToString(&ver_val)) {
             code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
             ss << "failed to serialize version_pb when saving, txn_id=" << 
txn_id;
@@ -1059,7 +1063,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
 
         txn->put(i.first, ver_val);
         LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " 
version:" << i.second
-                  << " txn_id=" << txn_id;
+                  << " txn_id=" << txn_id << " update_time=" << 
version_update_time_ms;
 
         std::string_view ver_key = i.first;
         ver_key.remove_prefix(1); // Remove key space
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index f5e3cc90e85..d5b1c258c5d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1149,6 +1149,21 @@ public class OlapTable extends Table implements 
MTMVRelatedTableIf, GsonPostProc
         return partition;
     }
 
+    public void getVersionInBatchForCloudMode(Collection<Long> partitionIds) {
+        if (Config.isCloudMode()) { // do nothing for non-cloud mode
+            List<CloudPartition> partitions = partitionIds.stream()
+                    .sorted()
+                    .map(this::getPartition)
+                    .map(partition -> (CloudPartition) partition)
+                    .collect(Collectors.toList());
+            try {
+                CloudPartition.getSnapshotVisibleVersion(partitions);
+            } catch (RpcException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
     // select the non-empty partition ids belonging to this table.
     //
     // ATTN: partitions not belonging to this table will be filtered.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
index 775ee5530c5..ef94da902b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java
@@ -163,6 +163,13 @@ public class Partition extends MetaObject {
         this.setVisibleVersionAndTime(visibleVersion, visibleVersionTime);
     }
 
+    /* fromCache is only used in CloudPartition
+     * make it overrided here to avoid rewrite all the usages with ugly 
Config.isCloudConfig() branches
+     */
+    public long getVisibleVersion(Boolean fromCache) {
+        return visibleVersion;
+    }
+
     public long getVisibleVersion() {
         return visibleVersion;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
index b3f868ebe12..1246c5b640b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java
@@ -91,17 +91,22 @@ public class CloudPartition extends Partition {
         return;
     }
 
-    public void setCachedVisibleVersion(long version) {
+    public void setCachedVisibleVersion(long version, Long 
versionUpdateTimeMs) {
         // we only care the version should increase monotonically and ignore 
the readers
         LOG.debug("setCachedVisibleVersion use CloudPartition {}, version: {}, 
old version: {}",
                 super.getId(), version, super.getVisibleVersion());
         lock.lock();
         if (version > super.getVisibleVersion()) {
-            super.setVisibleVersion(version);
+            super.setVisibleVersionAndTime(version, versionUpdateTimeMs);
         }
         lock.unlock();
     }
 
+    @Override
+    public long getVisibleVersion(Boolean fromCache) {
+        return super.getVisibleVersion();
+    }
+
     @Override
     public long getVisibleVersion() {
         if (Env.isCheckpointThread() || 
Config.enable_check_compatibility_mode) {
@@ -125,7 +130,8 @@ public class CloudPartition extends Partition {
             if (resp.getStatus().getCode() == MetaServiceCode.OK) {
                 version = resp.getVersion();
                 // Cache visible version, see hasData() for details.
-                setCachedVisibleVersion(version);
+                assert resp.getVersionUpdateTimeMsList().size() == 1;
+                setCachedVisibleVersion(version, 
resp.getVersionUpdateTimeMs(0));
             } else {
                 assert resp.getStatus().getCode() == 
MetaServiceCode.VERSION_NOT_FOUND;
                 version = Partition.PARTITION_INIT_VERSION;
@@ -185,20 +191,21 @@ public class CloudPartition extends Partition {
         List<Long> dbIds = new ArrayList<>();
         List<Long> tableIds = new ArrayList<>();
         List<Long> partitionIds = new ArrayList<>();
+        List<Long> versionUpdateTimesMs = new ArrayList<>();
         for (CloudPartition partition : partitions) {
             dbIds.add(partition.getDbId());
             tableIds.add(partition.getTableId());
             partitionIds.add(partition.getId());
         }
 
-        List<Long> versions = getSnapshotVisibleVersion(dbIds, tableIds, 
partitionIds);
+        List<Long> versions = getSnapshotVisibleVersion(dbIds, tableIds, 
partitionIds, versionUpdateTimesMs);
 
         // Cache visible version, see hasData() for details.
         int size = versions.size();
         for (int i = 0; i < size; ++i) {
             Long version = versions.get(i);
             if (version > Partition.PARTITION_INIT_VERSION) {
-                partitions.get(i).setCachedVisibleVersion(versions.get(i));
+                partitions.get(i).setCachedVisibleVersion(versions.get(i), 
versionUpdateTimesMs.get(i));
             }
         }
 
@@ -208,7 +215,8 @@ public class CloudPartition extends Partition {
     // Get visible versions for the specified partitions.
     //
     // Return the visible version in order of the specified partition ids, -1 
means version NOT FOUND.
-    public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, 
List<Long> tableIds, List<Long> partitionIds)
+    public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, 
List<Long> tableIds, List<Long> partitionIds,
+            List<Long> versionUpdateTimesMs)
             throws RpcException {
         assert dbIds.size() == partitionIds.size() :
                 "partition ids size: " + partitionIds.size() + " should equals 
to db ids size: " + dbIds.size();
@@ -251,6 +259,10 @@ public class CloudPartition extends Partition {
             return news;
         }
 
+        if (versionUpdateTimesMs != null) {
+            versionUpdateTimesMs.addAll(resp.getVersionUpdateTimeMsList());
+        }
+
         return versions;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
index 879621faef9..3ebc9d13808 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java
@@ -125,10 +125,6 @@ public class CloudInternalCatalog extends InternalCatalog {
             indexMap.put(indexId, rollup);
         }
 
-        // version and version hash
-        if (versionInfo != null) {
-            partition.updateVisibleVersion(versionInfo);
-        }
         long version = partition.getVisibleVersion();
 
         final String storageVaultName = tbl.getStorageVaultName();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index d3cd92e52f5..b433d8cfbe7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -385,7 +385,9 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             if (partition == null) {
                 continue;
             }
-            partition.setCachedVisibleVersion(version);
+            partition.setCachedVisibleVersion(version, 
commitTxnResponse.getVersionUpdateTimeMs());
+            LOG.info("Update Partition. transactionId:{}, table_id:{}, 
partition_id:{}, version:{}, update time:{}",
+                    txnId, tableId, partition.getId(), version, 
commitTxnResponse.getVersionUpdateTimeMs());
         }
         env.getAnalysisManager().setNewPartitionLoaded(
                 
tablePartitionMap.keySet().stream().collect(Collectors.toList()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
index cf6280650f0..b661196dd3b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java
@@ -62,6 +62,7 @@ import org.apache.commons.collections.CollectionUtils;
 
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -253,16 +254,16 @@ public class NereidsSqlCacheManager {
                 return true;
             }
             OlapTable olapTable = (OlapTable) tableIf;
-            long currentTableTime = olapTable.getVisibleVersionTime();
-            long cacheTableTime = scanTable.latestTimestamp;
             long currentTableVersion = olapTable.getVisibleVersion();
             long cacheTableVersion = scanTable.latestVersion;
             // some partitions have been dropped, or delete or updated or 
replaced, or insert rows into new partition?
-            if (currentTableTime > cacheTableTime
-                    || (currentTableTime == cacheTableTime && 
currentTableVersion > cacheTableVersion)) {
+            if (currentTableVersion != cacheTableVersion) {
                 return true;
             }
 
+            Collection<Long> partitionIds = scanTable.getScanPartitions();
+            olapTable.getVersionInBatchForCloudMode(partitionIds);
+
             for (Long scanPartitionId : scanTable.getScanPartitions()) {
                 Partition partition = olapTable.getPartition(scanPartitionId);
                 // partition == null: is this partition truncated?
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
index f3fa61cecaa..f8d5a00b9f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java
@@ -422,7 +422,6 @@ public class SqlCacheContext {
     @lombok.AllArgsConstructor
     public static class ScanTable {
         public final FullTableName fullTableName;
-        public final long latestTimestamp;
         public final long latestVersion;
         public final List<Long> scanPartitions = Lists.newArrayList();
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
index c88f3bd13a1..17be5c42e22 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java
@@ -693,15 +693,19 @@ public class CacheAnalyzer {
         CatalogIf catalog = database.getCatalog();
         ScanTable scanTable = new ScanTable(
                 new FullTableName(catalog.getName(), database.getFullName(), 
olapTable.getName()),
-                olapTable.getVisibleVersionTime(), 
olapTable.getVisibleVersion());
+                olapTable.getVisibleVersion());
         scanTables.add(scanTable);
+
+        Collection<Long> partitionIds = node.getSelectedPartitionIds();
+        olapTable.getVersionInBatchForCloudMode(partitionIds);
+
         for (Long partitionId : node.getSelectedPartitionIds()) {
             Partition partition = olapTable.getPartition(partitionId);
             scanTable.addScanPartition(partitionId);
             if (partition.getVisibleVersionTime() >= 
cacheTable.latestPartitionTime) {
                 cacheTable.latestPartitionId = partition.getId();
                 cacheTable.latestPartitionTime = 
partition.getVisibleVersionTime();
-                cacheTable.latestPartitionVersion = 
partition.getVisibleVersion();
+                cacheTable.latestPartitionVersion = 
partition.getVisibleVersion(true);
             }
         }
         return cacheTable;
@@ -716,8 +720,7 @@ public class CacheAnalyzer {
         DatabaseIf database = tableIf.getDatabase();
         CatalogIf catalog = database.getCatalog();
         ScanTable scanTable = new ScanTable(new FullTableName(
-                catalog.getName(), database.getFullName(), tableIf.getName()
-        ), cacheTable.latestPartitionTime, 0);
+                catalog.getName(), database.getFullName(), tableIf.getName()), 
0);
         scanTables.add(scanTable);
         return cacheTable;
     }
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index f285ad3f260..ab69deaa212 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -403,6 +403,7 @@ message TxnRunningPB {
 
 message VersionPB {
     optional int64 version = 1;
+    optional int64 update_time_ms = 2;
 }
 
 message RecycleTxnPB {
@@ -685,6 +686,7 @@ message CommitTxnResponse {
     repeated int64 partition_ids = 4;
     repeated int64 versions = 5;
     repeated TableStatsPB table_stats = 6;
+    optional int64 version_update_time_ms = 7;
 }
 
 message AbortTxnRequest {
@@ -818,6 +820,7 @@ message GetVersionResponse {
     repeated int64 table_ids = 4;
     repeated int64 partition_ids = 5;
     repeated int64 versions = 6;
+    repeated int64 version_update_time_ms = 7;
 };
 
 message GetObjStoreInfoRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to