This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c9bdd6be299 branch-3.1: [fix](broker-load) Fix the COLUMNS FROM PATH 
feature #57309 (#58351)
c9bdd6be299 is described below

commit c9bdd6be2992d947d851e795262c0660481427f9
Author: Refrain <[email protected]>
AuthorDate: Wed Nov 26 10:49:52 2025 +0800

    branch-3.1: [fix](broker-load) Fix the COLUMNS FROM PATH feature #57309 
(#58351)
    
    picked from #57309
---
 .../org/apache/doris/datasource/FileGroupInfo.java |  13 +-
 .../broker_load/test_load_data_from_path.groovy    | 205 +++++++++++++++++++++
 2 files changed, 214 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
index c563794dccb..d81ba7daa84 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileGroupInfo.java
@@ -268,7 +268,9 @@ public class FileGroupInfo {
                 context.params.setCompressType(compressType);
                 List<String> columnsFromPath = 
BrokerUtil.parseColumnsFromPath(fileStatus.path,
                         context.fileGroup.getColumnNamesFromPath());
-                TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, 
fileStatus.size, columnsFromPath);
+                List<String> columnsFromPathKeys = 
context.fileGroup.getColumnNamesFromPath();
+                TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, 
fileStatus.size, columnsFromPath,
+                        columnsFromPathKeys);
                 
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
             }
             scanRangeLocations.add(locations);
@@ -312,12 +314,13 @@ public class FileGroupInfo {
             context.params.setCompressType(compressType);
             List<String> columnsFromPath = 
BrokerUtil.parseColumnsFromPath(fileStatus.path,
                     context.fileGroup.getColumnNamesFromPath());
+            List<String> columnsFromPathKeys = 
context.fileGroup.getColumnNamesFromPath();
             // Assign scan range locations only for broker load.
             // stream load has only one file, and no need to set multi scan 
ranges.
             if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) 
{
                 long rangeBytes = bytesPerInstance - curInstanceBytes;
                 TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, 
fileStatus, rangeBytes,
-                        columnsFromPath);
+                        columnsFromPath, columnsFromPathKeys);
                 
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
                 curFileOffset += rangeBytes;
 
@@ -326,7 +329,8 @@ public class FileGroupInfo {
                 curLocations = newLocations(context.params, brokerDesc, 
backendPolicy);
                 curInstanceBytes = 0;
             } else {
-                TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, 
fileStatus, leftBytes, columnsFromPath);
+                TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, 
fileStatus, leftBytes, columnsFromPath,
+                        columnsFromPathKeys);
                 
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
                 curFileOffset = 0;
                 curInstanceBytes += leftBytes;
@@ -397,7 +401,7 @@ public class FileGroupInfo {
     }
 
     private TFileRangeDesc createFileRangeDesc(long curFileOffset, 
TBrokerFileStatus fileStatus, long rangeBytes,
-            List<String> columnsFromPath) {
+            List<String> columnsFromPath, List<String> columnsFromPathKeys) {
         TFileRangeDesc rangeDesc = new TFileRangeDesc();
         if (jobType == JobType.BULK_LOAD) {
             rangeDesc.setPath(fileStatus.path);
@@ -405,6 +409,7 @@ public class FileGroupInfo {
             rangeDesc.setSize(rangeBytes);
             rangeDesc.setFileSize(fileStatus.size);
             rangeDesc.setColumnsFromPath(columnsFromPath);
+            rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
             if (getFileType() == TFileType.FILE_HDFS) {
                 URI fileUri = new Path(fileStatus.path).toUri();
                 rangeDesc.setFsName(fileUri.getScheme() + "://" + 
fileUri.getAuthority());
diff --git 
a/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy 
b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
new file mode 100644
index 00000000000..e4c6a5d2eed
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/test_load_data_from_path.groovy
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_load_columns_from_path", "load_p0") {
+    def s3BucketName = getS3BucketName()
+    def s3Endpoint = getS3Endpoint()
+    def s3Region = getS3Region()
+    def ak = getS3AK()
+    def sk = getS3SK()
+    def tableName = "test_columns_from_path"
+    def label = UUID.randomUUID().toString().replace("-", "0")
+    def path = 
"s3://${s3BucketName}/load/product=p1/code=107020/dt=20250202/data.csv"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+
+    sql """
+        CREATE TABLE ${tableName} (
+            k1 INT,
+            k2 INT,
+            pd VARCHAR(20) NULL,
+            code INT NULL,
+            dt DATE
+        )
+        DUPLICATE KEY(`k1`)
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+            "replication_allocation" = "tag.location.default: 1"
+        );
+        """
+    // test all three columns with set three
+    try {
+        sql """
+            LOAD LABEL ${label}
+            (
+                DATA INFILE("${path}")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY ","
+                FORMAT AS "CSV"
+                (k1, k2)
+                COLUMNS FROM PATH AS (product, code, dt)
+                SET
+                (
+                    pd = product,
+                    code = code,
+                    dt = dt
+                )
+            )
+            WITH S3
+            (
+                "s3.access_key" = "${ak}",
+                "s3.secret_key" = "${sk}",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}"
+            )
+            """
+
+        // Wait for load job to finish
+        def maxRetry = 60
+        def result = ""
+        for (int i = 0; i < maxRetry; i++) {
+            result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+            if (result[0].State == "FINISHED" || result[0].State == 
"CANCELLED") {
+                break
+            }
+            sleep(1000)
+        }
+
+        // Check load job state
+        assertEquals("FINISHED", result[0].State)
+
+        // Verify the loaded data
+        def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+        assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+        // Verify columns from path are extracted correctly
+        def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1"
+        assertEquals("p1", pathData[0][0])
+        assertEquals(107020, pathData[0][1])
+        assertEquals("2025-02-02", pathData[0][2].toString())
+
+    } finally {
+        sql """ TRUNCATE TABLE ${tableName} """
+    }
+    
+    // test all three columns with set non-same name column
+    label = UUID.randomUUID().toString().replace("-", "1")
+    try {
+        sql """
+            LOAD LABEL ${label}
+            (
+                DATA INFILE("${path}")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY ","
+                FORMAT AS "CSV"
+                (k1, k2)
+                COLUMNS FROM PATH AS (product, code, dt)
+                SET (
+                    pd = product
+                )
+            )
+            WITH S3
+            (
+                "s3.access_key" = "${ak}",
+                "s3.secret_key" = "${sk}",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}"
+            )
+            """
+
+        // Wait for load job to finish
+        def maxRetry = 60
+        def result = ""
+        for (int i = 0; i < maxRetry; i++) {
+            result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+            if (result[0].State == "FINISHED" || result[0].State == 
"CANCELLED") {
+                break
+            }
+            sleep(1000)
+        }
+
+        // Check load job state
+        assertEquals("FINISHED", result[0].State)
+
+        // Verify the loaded data
+        def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+        assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+        // Verify columns from path are extracted correctly
+        def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1"
+        assertEquals("p1", pathData[0][0])
+        assertEquals(107020, pathData[0][1])
+        assertEquals("2025-02-02", pathData[0][2].toString())
+
+    } finally {
+        sql """ TRUNCATE TABLE ${tableName} """
+    }
+
+    // test extracting only one column from path (only product)
+    label = UUID.randomUUID().toString().replace("-", "2")
+    try {
+        sql """
+            LOAD LABEL ${label}
+            (
+                DATA INFILE("${path}")
+                INTO TABLE ${tableName}
+                COLUMNS TERMINATED BY ","
+                FORMAT AS "CSV"
+                (k1, k2)
+                COLUMNS FROM PATH AS (product)
+                SET
+                (
+                    pd = product
+                )
+            )
+            WITH S3
+            (
+                "s3.access_key" = "${ak}",
+                "s3.secret_key" = "${sk}",
+                "s3.endpoint" = "${s3Endpoint}",
+                "s3.region" = "${s3Region}"
+            )
+            """
+
+        // Wait for load job to finish
+        def maxRetry = 60
+        def result = ""
+        for (int i = 0; i < maxRetry; i++) {
+            result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
+            if (result[0].State == "FINISHED" || result[0]. State == 
"CANCELLED") {
+                break
+            }
+            sleep(1000)
+        }
+
+        // Check load job state
+        assertEquals("FINISHED", result[0].State)
+
+        // Verify the loaded data
+        def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
+        assertTrue(rowCount[0][0] > 0, "No data was loaded")
+
+        // Verify only pd column is extracted from path, code and dt are 
loaded from CSV file
+        def pathData = sql "SELECT pd FROM ${tableName} LIMIT 1"
+        assertEquals("p1", pathData[0][0])
+        // code and dt should be loaded from CSV file data, not from path
+        // The actual values depend on the CSV file content
+
+    } finally {
+        sql """ DROP TABLE ${tableName} """
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to