This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 1cd6e862e97 branch-4.0: [improvement](load & cache) Set
disable_file_cache to true for load data into disposable LRU queue #56637
(#56882)
1cd6e862e97 is described below
commit 1cd6e862e9728a8ee4877164026ad9de7f0b418d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 14 11:18:05 2025 +0800
branch-4.0: [improvement](load & cache) Set disable_file_cache to true for
load data into disposable LRU queue #56637 (#56882)
Cherry-picked from #56637
Co-authored-by: Refrain <[email protected]>
---
be/src/vec/exec/scan/file_scanner.cpp | 1 +
.../org/apache/doris/qe/NereidsCoordinator.java | 36 ++-
.../suites/cloud_p0/cache/test_load_cache.groovy | 350 +++++++++++++++++++++
3 files changed, 376 insertions(+), 11 deletions(-)
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index 82fe714b55d..ce5e0b88e7e 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -170,6 +170,7 @@ Status FileScanner::init(RuntimeState* state, const
VExprContextSPtrs& conjuncts
RETURN_IF_ERROR(_init_io_ctx());
_io_ctx->file_cache_stats = _file_cache_statistics.get();
_io_ctx->file_reader_stats = _file_reader_stats.get();
+ _io_ctx->is_disposable = _state->query_options().disable_file_cache;
if (_is_load) {
_src_row_desc.reset(new RowDescriptor(_state->desc_tbl(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
index 858b3de08a9..04455f11248 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/NereidsCoordinator.java
@@ -91,21 +91,30 @@ public class NereidsCoordinator extends Coordinator {
super(context, planner, statsErrorEstimator);
this.coordinatorContext = CoordinatorContext.buildForSql(planner,
this);
-
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext,
-1L));
this.needEnqueue = true;
+ DataSink dataSink = coordinatorContext.dataSink;
+ // output to mysql or to file
+ if ((dataSink instanceof ResultSink || dataSink instanceof
ResultFileSink)) {
+ setForQuery();
+ } else {
+ setForInsert(-1L);
+ }
+
Preconditions.checkState(!planner.getFragments().isEmpty()
&& coordinatorContext.instanceNum.get() > 0, "Fragment and
Instance can not be empty˚");
}
+ // load, with JobId
public NereidsCoordinator(ConnectContext context,
NereidsPlanner planner, StatsErrorEstimator statsErrorEstimator,
long jobId) {
super(context, planner, statsErrorEstimator);
this.coordinatorContext = CoordinatorContext.buildForSql(planner,
this);
-
this.coordinatorContext.setJobProcessor(buildJobProcessor(coordinatorContext,
jobId));
this.needEnqueue = true;
+ // we don't need to check the dataSink, Because setting jobId means
this must be a load operation
+ setForInsert(jobId);
Preconditions.checkState(!planner.getFragments().isEmpty()
&& coordinatorContext.instanceNum.get() > 0, "Fragment and
Instance can not be empty˚");
}
@@ -120,6 +129,8 @@ public class NereidsCoordinator extends Coordinator {
this, jobId, queryId, fragments, distributedPlans, scanNodes,
descTable, timezone, loadZeroTolerance, enableProfile
);
+ // same reason in `setForInsert`
+ this.coordinatorContext.queryOptions.setDisableFileCache(true);
this.needEnqueue = false;
Preconditions.checkState(!fragments.isEmpty()
@@ -471,6 +482,18 @@ public class NereidsCoordinator extends Coordinator {
}
}
+ private void setForInsert(long jobId) {
+ JobProcessor jobProc = new LoadProcessor(this.coordinatorContext,
jobId);
+ this.coordinatorContext.setJobProcessor(jobProc);
+ // Set this field to true to avoid data entering the normal cache LRU
queue
+ this.coordinatorContext.queryOptions.setDisableFileCache(true);
+ }
+
+ private void setForQuery() {
+ JobProcessor jobProc = QueryProcessor.build(this.coordinatorContext);
+ this.coordinatorContext.setJobProcessor(jobProc);
+ }
+
private void setForBroker(
CoordinatorContext coordinatorContext, PipelineDistributedPlan
topPlan) throws AnalysisException {
DataSink dataSink = coordinatorContext.dataSink;
@@ -528,15 +551,6 @@ public class NereidsCoordinator extends Coordinator {
return false;
}
- private JobProcessor buildJobProcessor(CoordinatorContext
coordinatorContext, long jobId) {
- DataSink dataSink = coordinatorContext.dataSink;
- if ((dataSink instanceof ResultSink || dataSink instanceof
ResultFileSink)) {
- return QueryProcessor.build(coordinatorContext);
- } else {
- return new LoadProcessor(coordinatorContext, jobId);
- }
- }
-
@Override
public void setIsProfileSafeStmt(boolean isSafe) {
coordinatorContext.queryOptions.setEnableProfile(isSafe &&
coordinatorContext.queryOptions.isEnableProfile());
diff --git a/regression-test/suites/cloud_p0/cache/test_load_cache.groovy
b/regression-test/suites/cloud_p0/cache/test_load_cache.groovy
new file mode 100644
index 00000000000..25f9929aaf7
--- /dev/null
+++ b/regression-test/suites/cloud_p0/cache/test_load_cache.groovy
@@ -0,0 +1,350 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import groovy.json.JsonSlurper
+
+/*
+Test Description:
+
+1. When disable_file_cache = true and enable_file_cache = true, it is expected
that the S3 TVF load (import phase) will NOT enter the cache, while the query
+ phase will enter the Disposable queue.
+ Specifically: Normal queue size should be 0, Disposable queue size should
be 91163 bytes.
+2. When disable_file_cache = false and enable_file_cache = true, it is
expected that the S3 TVF load (import phase) will enter the Normal queue, and
the query
+ phase will still enter the Disposable queue.
+ Specifically: Normal queue size should be 236988 bytes, Disposable queue
size should still be 91163 bytes.
+
+Explanation: The query phase caches the compressed file, so the Disposable
queue size is checked for an exact value; for the import phase cache, since
future
+changes to statistics are possible, only a reasonable range is required.
+*/
+
+suite('test_load_cache', 'docker') {
+ def options = new ClusterOptions()
+ options.feConfigs += [
+ 'cloud_cluster_check_interval_second=1',
+ ]
+ options.beConfigs += [
+ 'file_cache_enter_disk_resource_limit_mode_percent=99',
+ 'enable_evict_file_cache_in_advance=false',
+ 'file_cache_background_monitor_interval_ms=1000',
+ ]
+ options.cloudMode = true
+ options.beNum = 1
+
+ def clearFileCache = {ip, port ->
+ def url = "http://${ip}:${port}/api/file_cache?op=clear&sync=true"
+ def response = new URL(url).text
+ def json = new JsonSlurper().parseText(response)
+
+ // Check the status
+ if (json.status != "OK") {
+ throw new RuntimeException("Clear cache on ${ip}:${port} failed:
${json.status}")
+ }
+ }
+
+ def clearFileCacheOnAllBackends = {
+ def backends = sql """SHOW BACKENDS"""
+
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[4]
+ clearFileCache(ip, port)
+ }
+
+ // clear file cache is async, wait it done
+ sleep(5000)
+ }
+
+ def getBrpcMetrics = {ip, port, name ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def matcher = metrics =~ ~"${name}\\s+(\\d+)"
+ if (matcher.find()) {
+ return matcher[0][1] as long
+ } else {
+ return 0
+ }
+ }
+
+ def getAllCacheMetrics = {ip, port ->
+ def url = "http://${ip}:${port}/brpc_metrics"
+ def metrics = new URL(url).text
+ def cacheMetrics = [:]
+ metrics.eachLine { line ->
+ if (line.contains("file_cache") && !line.startsWith("#")) {
+ def parts = line.split()
+ if (parts.size() >= 2) {
+ cacheMetrics[parts[0]] = parts[1]
+ }
+ }
+ }
+ return cacheMetrics
+ }
+
+ def getCacheFileList = {ip, port ->
+ try {
+ def url = "http://${ip}:${port}/api/file_cache?op=list"
+ def response = new URL(url).text
+ return response
+ } catch (Exception e) {
+ logger.warn("Failed to get cache file list: ${e.message}")
+ return "Error: ${e.message}"
+ }
+ }
+
+ def getNormalQueueSize = { ip, port ->
+ return getBrpcMetrics(ip, port, "file_cache_normal_queue_cache_size")
+ }
+
+ def getDisposableQueueSize = { ip, port ->
+ return getBrpcMetrics(ip, port,
"file_cache_disposable_queue_cache_size")
+ }
+
+ def getDisposableQueueElementCount = { ip, port ->
+ return getBrpcMetrics(ip, port,
"file_cache_disposable_queue_element_count")
+ }
+
+ def getTotalNormalQueueSize = {
+ def backends = sql """SHOW BACKENDS"""
+ long sum = 0
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[5]
+ def size = getNormalQueueSize(ip, port)
+ sum += size
+ logger.info("BE ${ip}:${port} normal_queue_size = ${size}")
+ }
+ return sum
+ }
+
+ def getTotalDisposableQueueSize = {
+ def backends = sql """SHOW BACKENDS"""
+ long sum = 0
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[5]
+ def size = getDisposableQueueSize(ip, port)
+ sum += size
+ logger.info("BE ${ip}:${port} disposable_queue_size = ${size}")
+ }
+ return sum
+ }
+
+ def getTotalDisposableQueueElementCount = {
+ def backends = sql """SHOW BACKENDS"""
+ long sum = 0
+ for (be in backends) {
+ def ip = be[1]
+ def port = be[5]
+ def count = getDisposableQueueElementCount(ip, port)
+ sum += count
+ logger.info("BE ${ip}:${port} disposable_queue_element_count =
${count}")
+ }
+ return sum
+ }
+
+ docker(options) {
+ def ak = getS3AK()
+ def sk = getS3SK()
+ def s3_endpoint = getS3Endpoint()
+ def s3_region = getS3Region()
+ def s3_bucket = getS3BucketName()
+ def s3_provider = getS3Provider()
+
+ def s3_tvf_uri =
"s3://${s3_bucket}/regression/tpch/sf0.01/customer.csv.gz"
+
+ //
============================================================================
+ // SCENARIO 1: disable_file_cache = true
+ //
============================================================================
+
+ // Clear file cache before test
+ clearFileCacheOnAllBackends()
+
+ // Set session variables for Scenario 1
+ sql "set disable_file_cache = true;"
+ sql "set enable_file_cache = true;"
+
+ // Create test table
+ sql """DROP TABLE IF EXISTS load_test_table"""
+ sql """
+ CREATE TABLE load_test_table (
+ C_CUSTKEY INTEGER NOT NULL,
+ C_NAME VARCHAR(25) NOT NULL,
+ C_ADDRESS VARCHAR(40) NOT NULL,
+ C_NATIONKEY INTEGER NOT NULL,
+ C_PHONE CHAR(15) NOT NULL,
+ C_ACCTBAL DECIMAL(15,2) NOT NULL,
+ C_MKTSEGMENT CHAR(10) NOT NULL,
+ C_COMMENT VARCHAR(117) NOT NULL
+ ) DUPLICATE KEY(C_CUSTKEY, C_NAME)
+ DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ // Record initial cache sizes
+ long normalQueueSizeBefore1 = getTotalNormalQueueSize()
+ long disposableQueueSizeBefore1 = getTotalDisposableQueueSize()
+ long disposableQueueElementCountBefore1 =
getTotalDisposableQueueElementCount()
+
+ // Execute S3 TVF Load
+ sql """
+ INSERT INTO load_test_table
+ SELECT
+ CAST(c1 AS INT) AS C_CUSTKEY,
+ CAST(c2 AS VARCHAR(25)) AS C_NAME,
+ CAST(c3 AS VARCHAR(40)) AS C_ADDRESS,
+ CAST(c4 AS INT) AS C_NATIONKEY,
+ CAST(c5 AS CHAR(15)) AS C_PHONE,
+ CAST(c6 AS DECIMAL(15,2)) AS C_ACCTBAL,
+ CAST(c7 AS CHAR(10)) AS C_MKTSEGMENT,
+ CAST(c8 AS VARCHAR(117)) AS C_COMMENT
+ FROM S3(
+ "uri" = "${s3_tvf_uri}",
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${s3_region}",
+ "format" = "csv",
+ "column_separator" = "|"
+ )
+ """
+
+ def rowCount1 = sql """SELECT COUNT(*) FROM load_test_table"""
+ logger.info("Loaded ${rowCount1[0][0]} rows")
+
+ // Wait for cache metrics to update
+ sleep(5000)
+
+ // Record cache sizes after load
+ long normalQueueSizeAfter1 = getTotalNormalQueueSize()
+ long disposableQueueSizeAfter1 = getTotalDisposableQueueSize()
+ long disposableQueueElementCountAfter1 =
getTotalDisposableQueueElementCount()
+
+ long normalQueueIncrease1 = normalQueueSizeAfter1 -
normalQueueSizeBefore1
+ long disposableQueueIncrease1 = disposableQueueSizeAfter1 -
disposableQueueSizeBefore1
+ long disposableElementIncrease1 = disposableQueueElementCountAfter1 -
disposableQueueElementCountBefore1
+
+ logger.info("Expected: Normal=0 bytes, Disposable=91163 bytes")
+ logger.info("Actual: Normal=${normalQueueIncrease1} bytes,
Disposable=${disposableQueueIncrease1} bytes")
+
+ // Verify Scenario 1
+ def expectedDisposableSize = 91163
+ assertTrue(disposableQueueIncrease1 == expectedDisposableSize,
+ "Scenario 1: Disposable queue should be exactly
${expectedDisposableSize} bytes, but got ${disposableQueueIncrease1} bytes")
+ assertTrue(disposableElementIncrease1 > 0,
+ "Scenario 1: Disposable queue elements should increase, but got
${disposableElementIncrease1}")
+ assertTrue(normalQueueIncrease1 == 0,
+ "Scenario 1: Normal queue should be 0 bytes, but got
${normalQueueIncrease1} bytes")
+
+ // Clean up
+ sql """DROP TABLE IF EXISTS load_test_table"""
+
+ // Wait between tests
+ sleep(3000)
+
+ //
============================================================================
+ // SCENARIO 2: disable_file_cache = false
+ //
============================================================================
+
+ // Clear file cache before test
+ clearFileCacheOnAllBackends()
+
+ // Set session variables for Scenario 2
+ sql "set disable_file_cache = false;"
+ sql "set enable_file_cache = true;"
+
+ // Create test table
+ sql """DROP TABLE IF EXISTS load_test_table"""
+ sql """
+ CREATE TABLE load_test_table (
+ C_CUSTKEY INTEGER NOT NULL,
+ C_NAME VARCHAR(25) NOT NULL,
+ C_ADDRESS VARCHAR(40) NOT NULL,
+ C_NATIONKEY INTEGER NOT NULL,
+ C_PHONE CHAR(15) NOT NULL,
+ C_ACCTBAL DECIMAL(15,2) NOT NULL,
+ C_MKTSEGMENT CHAR(10) NOT NULL,
+ C_COMMENT VARCHAR(117) NOT NULL
+ ) DUPLICATE KEY(C_CUSTKEY, C_NAME)
+ DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ )
+ """
+
+ // Record initial cache sizes
+ long normalQueueSizeBefore2 = getTotalNormalQueueSize()
+ long disposableQueueSizeBefore2 = getTotalDisposableQueueSize()
+ long disposableQueueElementCountBefore2 =
getTotalDisposableQueueElementCount()
+
+ // Execute S3 TVF Load
+ sql """
+ INSERT INTO load_test_table
+ SELECT
+ CAST(c1 AS INT) AS C_CUSTKEY,
+ CAST(c2 AS VARCHAR(25)) AS C_NAME,
+ CAST(c3 AS VARCHAR(40)) AS C_ADDRESS,
+ CAST(c4 AS INT) AS C_NATIONKEY,
+ CAST(c5 AS CHAR(15)) AS C_PHONE,
+ CAST(c6 AS DECIMAL(15,2)) AS C_ACCTBAL,
+ CAST(c7 AS CHAR(10)) AS C_MKTSEGMENT,
+ CAST(c8 AS VARCHAR(117)) AS C_COMMENT
+ FROM S3(
+ "uri" = "${s3_tvf_uri}",
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3_endpoint}",
+ "s3.region" = "${s3_region}",
+ "format" = "csv",
+ "column_separator" = "|"
+ )
+ """
+
+ def rowCount2 = sql """SELECT COUNT(*) FROM load_test_table"""
+ logger.info("Loaded ${rowCount2[0][0]} rows")
+
+ // Wait for cache metrics to update
+ sleep(5000)
+
+ // Record cache sizes after load
+ long normalQueueSizeAfter2 = getTotalNormalQueueSize()
+ long disposableQueueSizeAfter2 = getTotalDisposableQueueSize()
+ long disposableQueueElementCountAfter2 =
getTotalDisposableQueueElementCount()
+
+ long normalQueueIncrease2 = normalQueueSizeAfter2 -
normalQueueSizeBefore2
+ long disposableQueueIncrease2 = disposableQueueSizeAfter2 -
disposableQueueSizeBefore2
+ long disposableElementIncrease2 = disposableQueueElementCountAfter2 -
disposableQueueElementCountBefore2
+
+ logger.info("Expected: Normal=~237KB (range), Disposable=91163 bytes
(exact)")
+ logger.info("Actual: Normal=${normalQueueIncrease2} bytes
(${String.format("%.2f", normalQueueIncrease2 / 1024.0)} KB),
Disposable=${disposableQueueIncrease2} bytes")
+
+ // Verify Scenario 2
+ assertTrue(disposableQueueIncrease2 == expectedDisposableSize,
+ "Scenario 2: Disposable queue should be exactly
${expectedDisposableSize} bytes, but got ${disposableQueueIncrease2} bytes")
+ assertTrue(disposableElementIncrease2 > 0,
+ "Scenario 2: Disposable queue elements should increase, but got
${disposableElementIncrease2}")
+ def normalMinThreshold = 200 * 1024
+ def normalMaxThreshold = 280 * 1024
+ assertTrue(normalQueueIncrease2 >= normalMinThreshold &&
normalQueueIncrease2 <= normalMaxThreshold,
+ "Scenario 2: Normal queue should be in range
[${normalMinThreshold}, ${normalMaxThreshold}] bytes, but got
${normalQueueIncrease2} bytes")
+
+ // Clean up
+ sql """DROP TABLE IF EXISTS load_test_table"""
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]