This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new c9bdd6be299 branch-3.1: [fix](broker-load) Fix the COLUMNS FROM PATH
feature #57309 (#58351)
c9bdd6be299 is described below
commit c9bdd6be2992d947d851e795262c0660481427f9
Author: Refrain <[email protected]>
AuthorDate: Wed Nov 26 10:49:52 2025 +0800
branch-3.1: [fix](broker-load) Fix the COLUMNS FROM PATH feature #57309
(#58351)
picked from #57309
---
.../org/apache/doris/datasource/FileGroupInfo.java | 13 +-
.../broker_load/test_load_data_from_path.groovy | 205 +++++++++++++++++++++
2 files changed, 214 insertions(+), 4 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
index c563794dccb..d81ba7daa84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
@@ -268,7 +268,9 @@ public class FileGroupInfo {
context.params.setCompressType(compressType);
List<String> columnsFromPath =
BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
- TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus,
fileStatus.size, columnsFromPath);
+ List<String> columnsFromPathKeys =
context.fileGroup.getColumnNamesFromPath();
+ TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus,
fileStatus.size, columnsFromPath,
+ columnsFromPathKeys);
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
}
scanRangeLocations.add(locations);
@@ -312,12 +314,13 @@ public class FileGroupInfo {
context.params.setCompressType(compressType);
List<String> columnsFromPath =
BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
+ List<String> columnsFromPathKeys =
context.fileGroup.getColumnNamesFromPath();
// Assign scan range locations only for broker load.
// stream load has only one file, and no need to set multi scan
ranges.
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD)
{
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset,
fileStatus, rangeBytes,
- columnsFromPath);
+ columnsFromPath, columnsFromPathKeys);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;
@@ -326,7 +329,8 @@ public class FileGroupInfo {
curLocations = newLocations(context.params, brokerDesc,
backendPolicy);
curInstanceBytes = 0;
} else {
- TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset,
fileStatus, leftBytes, columnsFromPath);
+ TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset,
fileStatus, leftBytes, columnsFromPath,
+ columnsFromPathKeys);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset = 0;
curInstanceBytes += leftBytes;
@@ -397,7 +401,7 @@ public class FileGroupInfo {
}
private TFileRangeDesc createFileRangeDesc(long curFileOffset,
TBrokerFileStatus fileStatus, long rangeBytes,
- List<String> columnsFromPath) {
+ List<String> columnsFromPath, List<String> columnsFromPathKeys) {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
if (jobType == JobType.BULK_LOAD) {
rangeDesc.setPath(fileStatus.path);
@@ -405,6 +409,7 @@ public class FileGroupInfo {
rangeDesc.setSize(rangeBytes);
rangeDesc.setFileSize(fileStatus.size);
rangeDesc.setColumnsFromPath(columnsFromPath);
+ rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
if (getFileType() == TFileType.FILE_HDFS) {
URI fileUri = new Path(fileStatus.path).toUri();
rangeDesc.setFsName(fileUri.getScheme() + "://" +
fileUri.getAuthority());
diff --git
a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
new file mode 100644
index 00000000000..e4c6a5d2eed
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_load_columns_from_path", "load_p0") {
+ def s3BucketName = getS3BucketName()
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+ def ak = getS3AK()
+ def sk = getS3SK()
+ def tableName = "test_columns_from_path"
+ def label = UUID.randomUUID().toString().replace("-", "0")
+ def path =
"s3://${s3BucketName}/load/product=p1/code=107020/dt=20250202/data.csv"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+
+ sql """
+ CREATE TABLE ${tableName} (
+ k1 INT,
+ k2 INT,
+ pd VARCHAR(20) NULL,
+ code INT NULL,
+ dt DATE
+ )
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+ // test all three columns with set three
+ try {
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA INFILE("${path}")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY ","
+ FORMAT AS "CSV"
+ (k1, k2)
+ COLUMNS FROM PATH AS (product, code, dt)
+ SET
+ (
+ pd = product,
+ code = code,
+ dt = dt
+ )
+ )
+ WITH S3
+ (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ )
+ """
+
+ // Wait for load job to finish
+ def maxRetry = 60
+ def result = ""
+ for (int i = 0; i < maxRetry; i++) {
+ result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+ if (result[0].State == "FINISHED" || result[0].State ==
"CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ // Check load job state
+ assertEquals("FINISHED", result[0].State)
+
+ // Verify the loaded data
+ def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ // Verify columns from path are extracted correctly
+ def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1"
+ assertEquals("p1", pathData[0][0])
+ assertEquals(107020, pathData[0][1])
+ assertEquals("2025-02-02", pathData[0][2].toString())
+
+ } finally {
+ sql """ TRUNCATE TABLE ${tableName} """
+ }
+
+ // test all three columns with set non-same name column
+ label = UUID.randomUUID().toString().replace("-", "1")
+ try {
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA INFILE("${path}")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY ","
+ FORMAT AS "CSV"
+ (k1, k2)
+ COLUMNS FROM PATH AS (product, code, dt)
+ SET (
+ pd = product
+ )
+ )
+ WITH S3
+ (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ )
+ """
+
+ // Wait for load job to finish
+ def maxRetry = 60
+ def result = ""
+ for (int i = 0; i < maxRetry; i++) {
+ result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+ if (result[0].State == "FINISHED" || result[0].State ==
"CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ // Check load job state
+ assertEquals("FINISHED", result[0].State)
+
+ // Verify the loaded data
+ def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ // Verify columns from path are extracted correctly
+ def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1"
+ assertEquals("p1", pathData[0][0])
+ assertEquals(107020, pathData[0][1])
+ assertEquals("2025-02-02", pathData[0][2].toString())
+
+ } finally {
+ sql """ TRUNCATE TABLE ${tableName} """
+ }
+
+ // test extracting only one column from path (only product)
+ label = UUID.randomUUID().toString().replace("-", "2")
+ try {
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA INFILE("${path}")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY ","
+ FORMAT AS "CSV"
+ (k1, k2)
+ COLUMNS FROM PATH AS (product)
+ SET
+ (
+ pd = product
+ )
+ )
+ WITH S3
+ (
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.region" = "${s3Region}"
+ )
+ """
+
+ // Wait for load job to finish
+ def maxRetry = 60
+ def result = ""
+ for (int i = 0; i < maxRetry; i++) {
+ result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+ if (result[0].State == "FINISHED" || result[0]. State ==
"CANCELLED") {
+ break
+ }
+ sleep(1000)
+ }
+
+ // Check load job state
+ assertEquals("FINISHED", result[0].State)
+
+ // Verify the loaded data
+ def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+ assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+ // Verify only pd column is extracted from path, code and dt are
loaded from CSV file
+ def pathData = sql "SELECT pd FROM ${tableName} LIMIT 1"
+ assertEquals("p1", pathData[0][0])
+ // code and dt should be loaded from CSV file data, not from path
+ // The actual values depend on the CSV file content
+
+ } finally {
+ sql """ DROP TABLE ${tableName} """
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]