This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c7b96a411f9 branch-4.0: [fix](cloud)Fix read from peer use thread pool
not asyncio #57587 (#59000)
c7b96a411f9 is described below
commit c7b96a411f99995de11a3170f373e2330472b320
Author: deardeng <[email protected]>
AuthorDate: Mon Dec 22 14:37:33 2025 +0800
branch-4.0: [fix](cloud)Fix read from peer use thread pool not asyncio
#57587 (#59000)
cherry pick from #57587
---
be/src/cloud/config.cpp | 2 +-
be/src/io/cache/block_file_cache_downloader.cpp | 8 ++
.../main/java/org/apache/doris/common/Config.java | 14 +--
.../doris/cloud/catalog/CloudTabletRebalancer.java | 4 +-
.../cloud_p0/balance/test_balance_metrics.groovy | 110 +++++++++++++++++++++
.../balance/test_peer_read_async_warmup.groovy | 1 +
.../test_drop_cluster_clean_metrics.groovy | 108 ++++++++++++++++++++
.../test_fe_tablet_same_backend.groovy | 2 +-
.../cloud_p0/multi_cluster/test_rebalance.groovy | 6 +-
9 files changed, 241 insertions(+), 14 deletions(-)
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index ab31b9868f5..b915c1e0034 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -135,7 +135,7 @@ DEFINE_mBool(enable_standby_passive_compaction, "true");
DEFINE_mDouble(standby_compaction_version_ratio, "0.8");
-DEFINE_mBool(enable_cache_read_from_peer, "false");
+DEFINE_mBool(enable_cache_read_from_peer, "true");
// Cache the expiration time of the peer address.
// This can be configured to be less than the
`rehash_tablet_after_be_dead_seconds` setting in the `fe` configuration.
diff --git a/be/src/io/cache/block_file_cache_downloader.cpp
b/be/src/io/cache/block_file_cache_downloader.cpp
index ba28d9f3479..567eab3ad98 100644
--- a/be/src/io/cache/block_file_cache_downloader.cpp
+++ b/be/src/io/cache/block_file_cache_downloader.cpp
@@ -319,6 +319,14 @@ void FileCacheBlockDownloader::download_segment_file(const
DownloadFileMeta& met
std::unique_ptr<char[]> buffer(new char[one_single_task_size]);
+ DBUG_EXECUTE_IF("FileCacheBlockDownloader::download_segment_file_sleep", {
+ auto sleep_time =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "FileCacheBlockDownloader::download_segment_file_sleep",
"sleep_time", 3);
+ LOG(INFO) << "FileCacheBlockDownloader::download_segment_file_sleep:
sleep_time="
+ << sleep_time;
+ sleep(sleep_time);
+ });
+
size_t task_offset = 0;
for (size_t i = 0; i < task_num; i++) {
size_t offset = meta.offset + task_offset;
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 0ca1f271a7a..eb33df5003c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3375,13 +3375,13 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int cloud_min_balance_tablet_num_per_run = 2;
- @ConfField(description = {"指定存算分离模式下所有 Compute group 的扩缩容预热方式。"
- + "without_warmup: 直接修改 tablet 分片映射,首次读从 S3 拉取,均衡最快但性能波动最大;"
- + "async_warmup: 异步预热,尽力而为拉取 cache,均衡较快但可能 cache miss;"
- + "sync_warmup: 同步预热,确保 cache 迁移完成,均衡较慢但无 cache miss;"
- + "peer_read_async_warmup: 直接修改 tablet 分片映射,首次读从 Peer BE
拉取,均衡最快可能会影响同计算组中其他 BE 性能。"
- + "注意:此为全局 FE 配置,也可通过 SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
- + "设置 compute group 维度的 balance 类型,compute group 维度配置优先级更高",
+ @ConfField(mutable = true, masterOnly = true, description =
{"指定存算分离模式下所有Compute group的扩缩容预热方式。"
+ + "without_warmup: 直接修改tablet分片映射,首次读从S3拉取,均衡最快但性能波动最大;"
+ + "async_warmup: 异步预热,尽力而为拉取cache,均衡较快但可能cache miss;"
+ + "sync_warmup: 同步预热,确保cache迁移完成,均衡较慢但无cache miss;"
+ + "peer_read_async_warmup: 直接修改tablet分片映射,首次读从Peer
BE拉取,均衡最快可能会影响同计算组中其他BE性能。"
+ + "注意:此为全局FE配置,也可通过SQL(ALTER COMPUTE GROUP cg PROPERTIES)"
+ + "设置compute group维度的balance类型,compute group维度配置优先级更高",
"Specify the scaling and warming methods for all Compute groups in a
cloud mode. "
+ "without_warmup: Directly modify shard mapping, first read from
S3,"
+ "fastest re-balance but largest fluctuation; "
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
index 654eb91cf78..019fb33aeff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java
@@ -988,7 +988,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
sendPreHeatingRpc(task.pickedTablet, task.srcBe, task.destBe);
} catch (Exception e) {
LOG.warn("Failed to preheat tablet {} from {} to {}, "
- + "help msg turn off fe config
enable_cloud_warm_up_for_rebalance",
+ + "help msg change fe config
cloud_warm_up_for_rebalance_type to without_warmup, ",
task.pickedTablet.getId(), task.srcBe, task.destBe, e);
}
}
@@ -1286,7 +1286,7 @@ public class CloudTabletRebalancer extends MasterDaemon {
sendPreHeatingRpc(pickedTablet, srcBe, destBe);
} catch (Exception e) {
LOG.warn("Failed to preheat tablet {} from {} to {}, "
- + "help msg turn off fe config
enable_cloud_warm_up_for_rebalance",
+ + "help msg change fe config
cloud_warm_up_for_rebalance_type to without_warmup ",
pickedTablet.getId(), srcBe, destBe, e);
return;
}
diff --git
a/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
new file mode 100644
index 00000000000..eb81ad69524
--- /dev/null
+++ b/regression-test/suites/cloud_p0/balance/test_balance_metrics.groovy
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+
+
+suite('test_balance_metrics', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=2',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cloud_warm_up_for_rebalance_type=without_warmup'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*'
+ ]
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def getFEMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/metrics"
+ logger.info("getFEMetrics1, url: ${url}, name: ${name}")
+ def metrics = new URL(url).text
+ def pattern =
java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name) +
"\\s+(\\d+)")
+ def matcher = pattern.matcher(metrics)
+ if (matcher.find()) {
+ def ret = matcher[0][1] as long
+ logger.info("getFEMetrics2, ${url}, name:${name}, value:${ret}")
+ return ret
+ } else {
+ throw new RuntimeException("${name} not found for ${ip}:${port}")
+ }
+ }
+
+ def testCase = { table ->
+ def master = cluster.getMasterFe()
+ def allEditlogNum = 0;
+ def future = thread {
+ awaitUntil(300) {
+ def name =
"""doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id",
cluster_name="compute_cluster"}"""
+ def value = getFEMetrics(master.host, master.httpPort, name)
+ allEditlogNum += value
+ logger.info("balance metrics value: ${value}, allEditlogNum:
${allEditlogNum}")
+ return value == 0 && allEditlogNum > 0
+ }
+ }
+ sql """CREATE TABLE $table (
+ `k1` int(11) NULL,
+ `v1` VARCHAR(2048)
+ )
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 200
+ PROPERTIES (
+ "replication_num"="1"
+ );
+ """
+ // generate some balance tasks
+ cluster.addBackend(1)
+ future.get()
+ // wait for rebalancer to do its job
+ assertTrue(allEditlogNum > 0, "balance metrics not increased")
+
+ allEditlogNum = 0
+ for (i in 0..30) {
+ sleep(1000)
+ def name =
"""doris_fe_cloud_partition_balance_num{cluster_id="compute_cluster_id",
cluster_name="compute_cluster"}"""
+ def value = getFEMetrics(master.host, master.httpPort, name)
+ allEditlogNum += value
+ logger.info("Final balance metrics value: ${value}, allEditlogNum:
${allEditlogNum}")
+ }
+ // after all balance tasks done, the metric should not increase
+ assertTrue(allEditlogNum == 0, "final balance metrics not increased")
+
+ cluster.addBackend(1, "other_cluster")
+ sleep(5000)
+ def name =
"""doris_fe_cloud_partition_balance_num{cluster_id="other_cluster_id",
cluster_name="other_cluster"}"""
+ def value = getFEMetrics(master.host, master.httpPort, name)
+ logger.info("other cluster balance metrics value: ${value}")
+ }
+
+ docker(options) {
+ testCase("test_balance_metrics_tbl")
+ }
+}
diff --git
a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
index 38aa4367878..ba9a61409bb 100644
--- a/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
+++ b/regression-test/suites/cloud_p0/balance/test_peer_read_async_warmup.groovy
@@ -38,6 +38,7 @@ suite('test_peer_read_async_warmup', 'docker') {
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
+ 'enable_cache_read_from_peer=true',
]
options.setFeNum(1)
options.setBeNum(1)
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
new file mode 100644
index 00000000000..d5f2e24c50c
--- /dev/null
+++
b/regression-test/suites/cloud_p0/multi_cluster/test_drop_cluster_clean_metrics.groovy
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonOutput
+
+suite('test_drop_cluster_clean_metrics', 'docker') {
+ if (!isCloudMode()) {
+ return;
+ }
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ 'cloud_tablet_rebalancer_interval_second=2',
+ 'sys_log_verbose_modules=org',
+ 'heartbeat_interval_second=1',
+ 'rehash_tablet_after_be_dead_seconds=3600',
+ 'cloud_warm_up_for_rebalance_type=without_warmup'
+ ]
+ options.beConfigs += [
+ 'report_tablet_interval_seconds=1',
+ 'schedule_sync_tablets_interval_s=18000',
+ 'disable_auto_compaction=true',
+ 'sys_log_verbose_modules=*'
+ ]
+ options.setFeNum(2)
+ options.setBeNum(2)
+ options.cloudMode = true
+ options.enableDebugPoints()
+
+ def drop_cluster_api = { msHttpPort, request_body, check_func ->
+ httpTest {
+ endpoint msHttpPort
+ uri "/MetaService/http/drop_cluster?token=$token"
+ body request_body
+ check check_func
+ }
+ }
+
+ def getFEMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/metrics"
+ logger.info("getFEMetrics1, url: ${url}, name: ${name}")
+ def metrics = new URL(url).text
+
+ def metricLinePattern =
java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name))
+
+ def matcher = metricLinePattern.matcher(metrics)
+ boolean found = false
+ while (matcher.find()) {
+ found = true
+ logger.info("getFEMetrics MATCH FOUND: ${matcher.group(0)}")
+ }
+
+ if (found) {
+ return true
+ } else {
+ def snippet = metrics.length() > 2000 ? metrics.substring(0, 2000)
+ "..." : metrics
+ logger.info("getFEMetrics NO MATCH for name=${name}, metrics
snippet:\n${snippet}")
+ return false
+ }
+ }
+
+ def testCase = { ->
+ def ms = cluster.getAllMetaservices().get(0)
+ def msHttpPort = ms.host + ":" + ms.httpPort
+ def fe = cluster.getOneFollowerFe();
+ sleep(3000) // wait for metrics ready
+ def metrics1 = """cluster_id="compute_cluster_id","""
+ assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics1))
+
+ // drop compute cluster
+ def beClusterMap = [cluster_id:"compute_cluster_id"]
+ def instance = [instance_id: "default_instance_id", cluster:
beClusterMap]
+ def jsonOutput = new JsonOutput()
+ def dropFeClusterBody = jsonOutput.toJson(instance)
+ drop_cluster_api.call(msHttpPort, dropFeClusterBody) {
+ respCode, body ->
+ log.info("drop fe cluster http cli result: ${body}
${respCode}".toString())
+ def json = parseJson(body)
+ }
+ sleep(3000) // wait for metrics cleaned
+ assertFalse(getFEMetrics(fe.host, fe.httpPort, metrics1))
+
+ cluster.addBackend(2, "new_cluster")
+
+ sleep(3000) // wait for metrics cleaned
+ def metrics2 = """cluster_id="new_cluster_id","""
+ assertTrue(getFEMetrics(fe.host, fe.httpPort, metrics2))
+ }
+
+ docker(options) {
+ testCase()
+ }
+}
\ No newline at end of file
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
index 5ecc610bf5a..9f24d1b2dbf 100644
---
a/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
+++
b/regression-test/suites/cloud_p0/multi_cluster/test_fe_tablet_same_backend.groovy
@@ -107,7 +107,7 @@ suite('test_fe_tablet_same_backend',
'multi_cluster,docker') {
def options = new ClusterOptions()
options.feConfigs += [
'cloud_cluster_check_interval_second=1',
- 'enable_cloud_warm_up_for_rebalance=true',
+ 'cloud_warm_up_for_rebalance_type=async_warmup',
'cloud_tablet_rebalancer_interval_second=1',
'cloud_balance_tablet_percent_per_run=1.0',
]
diff --git
a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
index 83a8dc336de..81f2227ce44 100644
--- a/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
+++ b/regression-test/suites/cloud_p0/multi_cluster/test_rebalance.groovy
@@ -40,8 +40,8 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
'sys_log_verbose_modules=org',
]
}
- clusterOptions[0].feConfigs += ['enable_cloud_warm_up_for_rebalance=true',
'cloud_pre_heating_time_limit_sec=300']
- clusterOptions[1].feConfigs += ['enable_cloud_warm_up_for_rebalance=false']
+ clusterOptions[0].feConfigs +=
['cloud_warm_up_for_rebalance_type=sync_warmup','cloud_pre_heating_time_limit_sec=300']
+ clusterOptions[1].feConfigs +=
['cloud_warm_up_for_rebalance_type=without_warmup']
for (int i = 0; i < clusterOptions.size(); i++) {
@@ -178,7 +178,7 @@ suite('test_rebalance_in_cloud', 'multi_cluster,docker') {
// add a be
cluster.addBackend(1, null)
// warm up
- sql """admin set frontend
config("enable_cloud_warm_up_for_rebalance"="true")"""
+ sql """admin set frontend
config("cloud_warm_up_for_rebalance_type"="sync_warmup")"""
// test rebalance thread still work
awaitUntil(30) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]