This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c0591389cc706ea46dac72438e48e1ab42bd587e Author: zhengyu <[email protected]> AuthorDate: Fri Jun 28 19:05:58 2024 +0800 [enhancement](cloud) batching get visible version from MetaService (#34615) Get visible versions one by one from MetaService is costly. Batching them into one RPC will not only reduce the workload of RPC service but also reduce the lag. --- cloud/src/meta-service/meta_service.cpp | 3 +++ cloud/src/meta-service/meta_service_txn.cpp | 6 +++++- .../java/org/apache/doris/catalog/OlapTable.java | 15 ++++++++++++++ .../java/org/apache/doris/catalog/Partition.java | 7 +++++++ .../apache/doris/cloud/catalog/CloudPartition.java | 24 ++++++++++++++++------ .../cloud/datasource/CloudInternalCatalog.java | 4 ---- .../transaction/CloudGlobalTransactionMgr.java | 4 +++- .../doris/common/NereidsSqlCacheManager.java | 9 ++++---- .../org/apache/doris/nereids/SqlCacheContext.java | 1 - .../org/apache/doris/qe/cache/CacheAnalyzer.java | 11 ++++++---- gensrc/proto/cloud.proto | 3 +++ 11 files changed, 66 insertions(+), 21 deletions(-) diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index b234a48d88a..0bb1fb5f277 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -269,6 +269,7 @@ void MetaServiceImpl::get_version(::google::protobuf::RpcController* controller, return; } response->set_version(version_pb.version()); + response->add_version_update_time_ms(version_pb.update_time_ms()); } { TEST_SYNC_POINT_CALLBACK("get_version_code", &code); } return; @@ -373,6 +374,7 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr if (!value.has_value()) { // return -1 if the target version is not exists. response->add_versions(-1); + response->add_version_update_time_ms(-1); } else if (is_table_version) { int64_t version = 0; if (!txn->decode_atomic_int(*value, &version)) { @@ -389,6 +391,7 @@ void MetaServiceImpl::batch_get_version(::google::protobuf::RpcController* contr break; } response->add_versions(version_pb.version()); + response->add_version_update_time_ms(version_pb.update_time_ms()); } } } diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 729696f8e2f..69fecc80788 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -1046,10 +1046,14 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, } // Save versions + int64_t version_update_time_ms = + duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); + response->set_version_update_time_ms(version_update_time_ms); for (auto& i : new_versions) { std::string ver_val; VersionPB version_pb; version_pb.set_version(i.second); + version_pb.set_update_time_ms(version_update_time_ms); if (!version_pb.SerializeToString(&ver_val)) { code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; ss << "failed to serialize version_pb when saving, txn_id=" << txn_id; @@ -1059,7 +1063,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, txn->put(i.first, ver_val); LOG(INFO) << "xxx put partition_version_key=" << hex(i.first) << " version:" << i.second - << " txn_id=" << txn_id; + << " txn_id=" << txn_id << " update_time=" << version_update_time_ms; std::string_view ver_key = i.first; ver_key.remove_prefix(1); // Remove key space diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index f5e3cc90e85..d5b1c258c5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1149,6 +1149,21 @@ public class OlapTable extends Table implements MTMVRelatedTableIf, GsonPostProc return partition; } + public void getVersionInBatchForCloudMode(Collection<Long> partitionIds) { + if (Config.isCloudMode()) { // do nothing for non-cloud mode + List<CloudPartition> partitions = partitionIds.stream() + .sorted() + .map(this::getPartition) + .map(partition -> (CloudPartition) partition) + .collect(Collectors.toList()); + try { + CloudPartition.getSnapshotVisibleVersion(partitions); + } catch (RpcException e) { + throw new RuntimeException(e); + } + } + } + // select the non-empty partition ids belonging to this table. // // ATTN: partitions not belonging to this table will be filtered. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java index 775ee5530c5..ef94da902b8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Partition.java @@ -163,6 +163,13 @@ public class Partition extends MetaObject { this.setVisibleVersionAndTime(visibleVersion, visibleVersionTime); } + /* fromCache is only used in CloudPartition + * make it overrided here to avoid rewrite all the usages with ugly Config.isCloudConfig() branches + */ + public long getVisibleVersion(Boolean fromCache) { + return visibleVersion; + } + public long getVisibleVersion() { return visibleVersion; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java index b3f868ebe12..1246c5b640b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudPartition.java @@ -91,17 +91,22 @@ public class CloudPartition extends Partition { return; } - public void setCachedVisibleVersion(long version) { + public void setCachedVisibleVersion(long version, Long versionUpdateTimeMs) { // we only care the version should increase monotonically and ignore the readers LOG.debug("setCachedVisibleVersion use CloudPartition {}, version: {}, old version: {}", super.getId(), version, super.getVisibleVersion()); lock.lock(); if (version > super.getVisibleVersion()) { - super.setVisibleVersion(version); + super.setVisibleVersionAndTime(version, versionUpdateTimeMs); } lock.unlock(); } + @Override + public long getVisibleVersion(Boolean fromCache) { + return super.getVisibleVersion(); + } + @Override public long getVisibleVersion() { if (Env.isCheckpointThread() || Config.enable_check_compatibility_mode) { @@ -125,7 +130,8 @@ public class CloudPartition extends Partition { if (resp.getStatus().getCode() == MetaServiceCode.OK) { version = resp.getVersion(); // Cache visible version, see hasData() for details. - setCachedVisibleVersion(version); + assert resp.getVersionUpdateTimeMsList().size() == 1; + setCachedVisibleVersion(version, resp.getVersionUpdateTimeMs(0)); } else { assert resp.getStatus().getCode() == MetaServiceCode.VERSION_NOT_FOUND; version = Partition.PARTITION_INIT_VERSION; @@ -185,20 +191,21 @@ public class CloudPartition extends Partition { List<Long> dbIds = new ArrayList<>(); List<Long> tableIds = new ArrayList<>(); List<Long> partitionIds = new ArrayList<>(); + List<Long> versionUpdateTimesMs = new ArrayList<>(); for (CloudPartition partition : partitions) { dbIds.add(partition.getDbId()); tableIds.add(partition.getTableId()); partitionIds.add(partition.getId()); } - List<Long> versions = getSnapshotVisibleVersion(dbIds, tableIds, partitionIds); + List<Long> versions = getSnapshotVisibleVersion(dbIds, tableIds, partitionIds, versionUpdateTimesMs); // Cache visible version, see hasData() for details. int size = versions.size(); for (int i = 0; i < size; ++i) { Long version = versions.get(i); if (version > Partition.PARTITION_INIT_VERSION) { - partitions.get(i).setCachedVisibleVersion(versions.get(i)); + partitions.get(i).setCachedVisibleVersion(versions.get(i), versionUpdateTimesMs.get(i)); } } @@ -208,7 +215,8 @@ public class CloudPartition extends Partition { // Get visible versions for the specified partitions. // // Return the visible version in order of the specified partition ids, -1 means version NOT FOUND. - public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, List<Long> tableIds, List<Long> partitionIds) + public static List<Long> getSnapshotVisibleVersion(List<Long> dbIds, List<Long> tableIds, List<Long> partitionIds, + List<Long> versionUpdateTimesMs) throws RpcException { assert dbIds.size() == partitionIds.size() : "partition ids size: " + partitionIds.size() + " should equals to db ids size: " + dbIds.size(); @@ -251,6 +259,10 @@ public class CloudPartition extends Partition { return news; } + if (versionUpdateTimesMs != null) { + versionUpdateTimesMs.addAll(resp.getVersionUpdateTimeMsList()); + } + return versions; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 879621faef9..3ebc9d13808 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -125,10 +125,6 @@ public class CloudInternalCatalog extends InternalCatalog { indexMap.put(indexId, rollup); } - // version and version hash - if (versionInfo != null) { - partition.updateVisibleVersion(versionInfo); - } long version = partition.getVisibleVersion(); final String storageVaultName = tbl.getStorageVaultName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java index d3cd92e52f5..b433d8cfbe7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java @@ -385,7 +385,9 @@ public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface { if (partition == null) { continue; } - partition.setCachedVisibleVersion(version); + partition.setCachedVisibleVersion(version, commitTxnResponse.getVersionUpdateTimeMs()); + LOG.info("Update Partition. transactionId:{}, table_id:{}, partition_id:{}, version:{}, update time:{}", + txnId, tableId, partition.getId(), version, commitTxnResponse.getVersionUpdateTimeMs()); } env.getAnalysisManager().setNewPartitionLoaded( tablePartitionMap.keySet().stream().collect(Collectors.toList())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java index cf6280650f0..b661196dd3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java @@ -62,6 +62,7 @@ import org.apache.commons.collections.CollectionUtils; import java.lang.reflect.Field; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Map.Entry; import java.util.Objects; @@ -253,16 +254,16 @@ public class NereidsSqlCacheManager { return true; } OlapTable olapTable = (OlapTable) tableIf; - long currentTableTime = olapTable.getVisibleVersionTime(); - long cacheTableTime = scanTable.latestTimestamp; long currentTableVersion = olapTable.getVisibleVersion(); long cacheTableVersion = scanTable.latestVersion; // some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition? - if (currentTableTime > cacheTableTime - || (currentTableTime == cacheTableTime && currentTableVersion > cacheTableVersion)) { + if (currentTableVersion != cacheTableVersion) { return true; } + Collection<Long> partitionIds = scanTable.getScanPartitions(); + olapTable.getVersionInBatchForCloudMode(partitionIds); + for (Long scanPartitionId : scanTable.getScanPartitions()) { Partition partition = olapTable.getPartition(scanPartitionId); // partition == null: is this partition truncated? diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index f3fa61cecaa..f8d5a00b9f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -422,7 +422,6 @@ public class SqlCacheContext { @lombok.AllArgsConstructor public static class ScanTable { public final FullTableName fullTableName; - public final long latestTimestamp; public final long latestVersion; public final List<Long> scanPartitions = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java index c88f3bd13a1..17be5c42e22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/cache/CacheAnalyzer.java @@ -693,15 +693,19 @@ public class CacheAnalyzer { CatalogIf catalog = database.getCatalog(); ScanTable scanTable = new ScanTable( new FullTableName(catalog.getName(), database.getFullName(), olapTable.getName()), - olapTable.getVisibleVersionTime(), olapTable.getVisibleVersion()); + olapTable.getVisibleVersion()); scanTables.add(scanTable); + + Collection<Long> partitionIds = node.getSelectedPartitionIds(); + olapTable.getVersionInBatchForCloudMode(partitionIds); + for (Long partitionId : node.getSelectedPartitionIds()) { Partition partition = olapTable.getPartition(partitionId); scanTable.addScanPartition(partitionId); if (partition.getVisibleVersionTime() >= cacheTable.latestPartitionTime) { cacheTable.latestPartitionId = partition.getId(); cacheTable.latestPartitionTime = partition.getVisibleVersionTime(); - cacheTable.latestPartitionVersion = partition.getVisibleVersion(); + cacheTable.latestPartitionVersion = partition.getVisibleVersion(true); } } return cacheTable; @@ -716,8 +720,7 @@ public class CacheAnalyzer { DatabaseIf database = tableIf.getDatabase(); CatalogIf catalog = database.getCatalog(); ScanTable scanTable = new ScanTable(new FullTableName( - catalog.getName(), database.getFullName(), tableIf.getName() - ), cacheTable.latestPartitionTime, 0); + catalog.getName(), database.getFullName(), tableIf.getName()), 0); scanTables.add(scanTable); return cacheTable; } diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index f285ad3f260..ab69deaa212 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -403,6 +403,7 @@ message TxnRunningPB { message VersionPB { optional int64 version = 1; + optional int64 update_time_ms = 2; } message RecycleTxnPB { @@ -685,6 +686,7 @@ message CommitTxnResponse { repeated int64 partition_ids = 4; repeated int64 versions = 5; repeated TableStatsPB table_stats = 6; + optional int64 version_update_time_ms = 7; } message AbortTxnRequest { @@ -818,6 +820,7 @@ message GetVersionResponse { repeated int64 table_ids = 4; repeated int64 partition_ids = 5; repeated int64 versions = 6; + repeated int64 version_update_time_ms = 7; }; message GetObjStoreInfoRequest { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
