This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2a14fca2e7e35e1ddfc0b2ecc1d099abe6b678b3
Author: Siyang Tang <[email protected]>
AuthorDate: Sun Jun 25 17:45:31 2023 +0800

    [feature](load-refactor-with-tvf) S3 load with S3 tvf and native insert 
(#19937)
---
 fe/fe-core/pom.xml                                 |   8 +-
 .../org/apache/doris/analysis/DataDescription.java |  10 +
 .../apache/doris/analysis/ImportColumnDesc.java    |   4 +
 .../apache/doris/analysis/NativeInsertStmt.java    |  36 +-
 .../org/apache/doris/analysis/S3TvfLoadStmt.java   | 423 +++++++++++++++++++++
 .../java/org/apache/doris/analysis/SelectStmt.java |   5 +
 .../org/apache/doris/analysis/UnifiedLoadStmt.java |  13 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     |   7 +-
 .../ExternalFileTableValuedFunction.java           |   6 +-
 .../doris/tablefunction/S3TableValuedFunction.java |  27 +-
 .../apache/doris/analysis/S3TvfLoadStmtTest.java   | 245 ++++++++++++
 .../load_p2/broker_load/test_broker_load.groovy    |  63 ---
 .../broker_load/test_tvf_based_broker_load.groovy  | 328 ++++++++++++++++
 13 files changed, 1075 insertions(+), 100 deletions(-)

diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index d2d933c08a..3d4e64b087 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -495,12 +495,12 @@ under the License.
             <artifactId>grpc-stub</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.apache.doris</groupId>
-            <artifactId>hive-catalog-shade</artifactId>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
         </dependency>
         <dependency>
-           <groupId>org.apache.httpcomponents</groupId>
-           <artifactId>httpclient</artifactId>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>hive-catalog-shade</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.velocity</groupId>
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 04f5042d72..658605abc5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -154,6 +154,8 @@ public class DataDescription implements InsertStmt.DataDesc 
{
     private boolean isMysqlLoad = false;
     private int skipLines = 0;
 
+    private boolean isAnalyzed = false;
+
     public DataDescription(String tableName,
                            PartitionNames partitionNames,
                            List<String> filePaths,
@@ -572,6 +574,10 @@ public class DataDescription implements 
InsertStmt.DataDesc {
         return columnSeparator.getSeparator();
     }
 
+    public Separator getColumnSeparatorObj() {
+        return columnSeparator;
+    }
+
     public boolean isNegative() {
         return isNegative;
     }
@@ -1001,6 +1007,9 @@ public class DataDescription implements 
InsertStmt.DataDesc {
     }
 
     public void analyze(String fullDbName) throws AnalysisException {
+        if (isAnalyzed) {
+            return;
+        }
         if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) {
             throw new AnalysisException("not support DELETE ON clause when 
merge type is not MERGE.");
         }
@@ -1015,6 +1024,7 @@ public class DataDescription implements 
InsertStmt.DataDesc {
         if (isNegative && mergeType != LoadTask.MergeType.APPEND) {
             throw new AnalysisException("Negative is only used when merge type 
is append.");
         }
+        isAnalyzed = true;
     }
 
     public void analyzeWithoutCheckPriv(String fullDbName) throws 
AnalysisException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java
index 5b93d31b9f..13040bdcb5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java
@@ -50,6 +50,10 @@ public class ImportColumnDesc {
         return columnName;
     }
 
+    public void setColumnName(String columnName) {
+        this.columnName = columnName;
+    }
+
     public Expr getExpr() {
         return expr;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index d3956de245..fcd85fd80b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -98,11 +98,11 @@ public class NativeInsertStmt extends InsertStmt {
     private static final String SHUFFLE_HINT = "SHUFFLE";
     private static final String NOSHUFFLE_HINT = "NOSHUFFLE";
 
-    private final TableName tblName;
+    protected final TableName tblName;
     private final PartitionNames targetPartitionNames;
     // parsed from targetPartitionNames.
     private List<Long> targetPartitionIds;
-    private final List<String> targetColumnNames;
+    protected List<String> targetColumnNames;
     private QueryStmt queryStmt;
     private final List<String> planHints;
     private Boolean isRepartition;
@@ -113,7 +113,7 @@ public class NativeInsertStmt extends InsertStmt {
 
     private final Map<String, Expr> exprByName = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
 
-    private Table targetTable;
+    protected Table targetTable;
 
     private DatabaseIf db;
     private long transactionId;
@@ -254,8 +254,7 @@ public class NativeInsertStmt extends InsertStmt {
         return isTransactionBegin;
     }
 
-    @Override
-    public void analyze(Analyzer analyzer) throws UserException {
+    protected void preCheckAnalyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
 
         if (targetTable == null) {
@@ -279,6 +278,20 @@ public class NativeInsertStmt extends InsertStmt {
         if (targetPartitionNames != null) {
             targetPartitionNames.analyze(analyzer);
         }
+    }
+
+    /**
+     * translate load related stmt to`insert into xx select xx from tvf` 
semantic
+     */
+    protected void convertSemantic(Analyzer analyzer) throws UserException {
+        // do nothing
+    }
+
+    @Override
+    public void analyze(Analyzer analyzer) throws UserException {
+        preCheckAnalyze(analyzer);
+
+        convertSemantic(analyzer);
 
         // set target table and
         analyzeTargetTable(analyzer);
@@ -321,8 +334,7 @@ public class NativeInsertStmt extends InsertStmt {
         }
     }
 
-    private void analyzeTargetTable(Analyzer analyzer) throws 
AnalysisException {
-        // Get table
+    protected void initTargetTable(Analyzer analyzer) throws AnalysisException 
{
         if (targetTable == null) {
             DatabaseIf db = analyzer.getEnv().getCatalogMgr()
                     
.getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
@@ -335,6 +347,11 @@ public class NativeInsertStmt extends InsertStmt {
                 throw new AnalysisException("Not support insert target 
table.");
             }
         }
+    }
+
+    private void analyzeTargetTable(Analyzer analyzer) throws 
AnalysisException {
+        // Get table
+        initTargetTable(analyzer);
 
         if (targetTable instanceof OlapTable) {
             OlapTable olapTable = (OlapTable) targetTable;
@@ -889,9 +906,4 @@ public class NativeInsertStmt extends InsertStmt {
             return RedirectStatus.FORWARD_WITH_SYNC;
         }
     }
-
-    @Override
-    public String toSql() {
-        return null;
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
new file mode 100644
index 0000000000..7c5eec7729
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java
@@ -0,0 +1,423 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.analysis.CompoundPredicate.Operator;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.constants.S3Properties.Env;
+import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * S3 Load based on S3 TVF
+ */
+public class S3TvfLoadStmt extends NativeInsertStmt {
+
+    private static final Logger LOG = 
LogManager.getLogger(S3TvfLoadStmt.class);
+
+    private static final String FORMAT_CSV = "csv";
+
+    private static final String DEFAULT_FORMAT = FORMAT_CSV;
+
+    private final DataDescription dataDescription;
+
+    /**
+     * for csv format, we need some particular process
+     */
+    private final boolean isCsvFormat;
+
+    /**
+     * only used for loading from csv format tvf
+     * with mapping from col name to csv-format-tvf-style col name (c1, c2, 
c3...)
+     */
+    private Map<String, String> selectColNameToCsvColName;
+
+    @VisibleForTesting
+    private Set<String> functionGenTableColNames;
+
+    public S3TvfLoadStmt(LabelName label, List<DataDescription> dataDescList, 
BrokerDesc brokerDesc,
+            Map<String, String> properties, String comments) throws 
DdlException {
+        super(buildInsertTarget(dataDescList.get(0)),
+                label.getLabelName(), /*insert all columns by default*/null,
+                buildInsertSource(dataDescList.get(0), brokerDesc), null);
+        this.label = label;
+        this.dataDescription = dataDescList.get(0);
+        this.properties = properties;
+        this.comments = comments;
+        this.isCsvFormat = isCsvFormat(dataDescription.getFileFormat());
+    }
+
+    // ------------------------------------ init helpers 
------------------------------------
+
+    private static InsertTarget buildInsertTarget(DataDescription 
dataDescription) {
+        final TableName tableName = new TableName(null, null, 
dataDescription.getTableName());
+        return new InsertTarget(tableName, 
dataDescription.getPartitionNames());
+    }
+
+    private static InsertSource buildInsertSource(DataDescription 
dataDescription, BrokerDesc brokerDesc)
+            throws DdlException {
+        final SelectList selectList = new SelectList();
+        // use `select *` by default
+        final SelectListItem item = new 
SelectListItem(SelectListItem.createStarItem(null));
+        selectList.addItem(item);
+
+        // build from
+        final FromClause fromClause = new FromClause(
+                Collections.singletonList(buildTvfRef(dataDescription, 
brokerDesc))
+        );
+
+        // trans order by in load stmt
+        final String sequenceCol = dataDescription.getSequenceCol();
+        final ArrayList<OrderByElement> orderByElementList = 
Lists.newArrayList();
+        if (!Strings.isNullOrEmpty(sequenceCol)) {
+            final OrderByElement orderByElement = new OrderByElement(
+                    new SlotRef(null, sequenceCol),
+                    true, null
+            );
+            orderByElementList.add(orderByElement);
+        }
+
+
+        // merge preceding filter and where expr
+        final BoolLiteral trueLiteral = new BoolLiteral(true);
+        final Expr whereExpr = 
Optional.ofNullable(dataDescription.getWhereExpr()).orElse(trueLiteral);
+        final Expr precdingFilterExpr =
+                
Optional.ofNullable(dataDescription.getPrecdingFilterExpr()).orElse(trueLiteral);
+        final Expr compoundPredicate = new CompoundPredicate(Operator.AND, 
precdingFilterExpr, whereExpr);
+
+        final SelectStmt selectStmt = new SelectStmt(
+                selectList, fromClause, compoundPredicate,
+                null, null,
+                orderByElementList, LimitElement.NO_LIMIT
+        );
+        return new InsertSource(selectStmt);
+    }
+
+    private static TableRef buildTvfRef(DataDescription dataDescription, 
BrokerDesc brokerDesc) throws DdlException {
+        final Map<String, String> params = Maps.newHashMap();
+
+        final List<String> filePaths = dataDescription.getFilePaths();
+        Preconditions.checkState(filePaths.size() == 1, "there should be only 
one file path");
+        final String s3FilePath = filePaths.get(0);
+        params.put(S3TableValuedFunction.S3_URI, s3FilePath);
+
+        final Map<String, String> dataDescProp = 
dataDescription.getProperties();
+        if (dataDescProp != null) {
+            params.putAll(dataDescProp);
+        }
+
+        final String format = 
Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT);
+        params.put(ExternalFileTableValuedFunction.FORMAT, format);
+        if (isCsvFormat(format)) {
+            final Separator separator = 
dataDescription.getColumnSeparatorObj();
+            if (separator != null) {
+                try {
+                    separator.analyze();
+                } catch (AnalysisException e) {
+                    throw new DdlException("failed to create s3 tvf ref", e);
+                }
+                params.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, 
dataDescription.getColumnSeparator());
+            }
+        }
+
+        Preconditions.checkState(!brokerDesc.isMultiLoadBroker(), "do not 
support multi broker load currently");
+        Preconditions.checkState(brokerDesc.getStorageType() == 
StorageType.S3, "only support S3 load");
+
+        final Map<String, String> s3ResourceProp = brokerDesc.getProperties();
+        S3Properties.convertToStdProperties(s3ResourceProp);
+        s3ResourceProp.keySet().removeIf(Env.FS_KEYS::contains);
+        params.putAll(s3ResourceProp);
+
+        try {
+            return new TableValuedFunctionRef(S3TableValuedFunction.NAME, 
null, params);
+        } catch (AnalysisException e) {
+            throw new DdlException("failed to create s3 tvf ref", e);
+        }
+    }
+
+    private static boolean isCsvFormat(String format) {
+        return Strings.isNullOrEmpty(format) || 
StringUtils.equalsIgnoreCase(format, FORMAT_CSV);
+    }
+
+    // 
--------------------------------------------------------------------------------------
+
+    @Override
+    public void convertSemantic(Analyzer analyzer) throws UserException {
+        label.analyze(analyzer);
+        initTargetTable(analyzer);
+        analyzeColumns(analyzer);
+    }
+
+    @Override
+    public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, 
Set<String> parentViewNameSet)
+            throws AnalysisException {
+
+        super.getTables(analyzer, tableMap, parentViewNameSet);
+        final List<TableIf> tables = Lists.newArrayList(tableMap.values());
+        Preconditions.checkState(tables.size() == 2,
+                "table map should only contain the unique function generated 
table and the unique target table");
+        functionGenTableColNames = tables.get(0)
+                .getBaseSchema()
+                .stream()
+                .map(Column::getName)
+                .collect(Collectors.toSet());
+    }
+
+    // ------------------------------------ columns mapping 
------------------------------------
+
+    private void analyzeColumns(Analyzer analyzer) throws AnalysisException {
+        final String fullDbName = 
dataDescription.analyzeFullDbName(label.getDbName(), analyzer);
+        dataDescription.analyze(fullDbName);
+        // copy a list for analyzing
+        List<ImportColumnDesc> columnExprList = 
Lists.newArrayList(dataDescription.getParsedColumnExprList());
+        if (CollectionUtils.isEmpty(columnExprList)) {
+            padExprListIfNeeded(columnExprList);
+        }
+        rewriteExpr(columnExprList);
+        boolean isFileFieldSpecified = 
columnExprList.stream().anyMatch(ImportColumnDesc::isColumn);
+        if (!isFileFieldSpecified) {
+            return;
+        }
+        if (isCsvFormat) {
+            // in tvf, csv format column names are like "c1, c2, c3", record 
for correctness of select list
+            recordCsvColNames(columnExprList);
+        }
+        columnExprList = filterColumns(columnExprList);
+        if (CollectionUtils.isEmpty(columnExprList)) {
+            return;
+        }
+        resetTargetColumnNames(columnExprList);
+        resetSelectList(columnExprList);
+    }
+
+    /**
+     * deal with the case that not all columns in table are in file
+     */
+    private void padExprListIfNeeded(List<ImportColumnDesc> columnExprList) {
+        if (isCsvFormat) {
+            return;
+        }
+        columnExprList.addAll(
+                functionGenTableColNames
+                        .stream()
+                        .map(ImportColumnDesc::new)
+                        .collect(Collectors.toList())
+        );
+    }
+
+    /**
+     * find and rewrite the derivative columns
+     * e.g. (v1,v2=v1+1,v3=v2+1) --> (v1, v2=v1+1, v3=v1+1+1)
+     */
+    private void rewriteExpr(List<ImportColumnDesc> columnDescList) {
+        Preconditions.checkNotNull(columnDescList, "columns should be not 
null");
+        Preconditions.checkNotNull(targetTable, "target table is unset");
+        LOG.debug("original columnExpr:{}", columnDescList);
+        Map<String, Expr> derivativeColumns = Maps.newHashMap();
+        columnDescList
+                .stream()
+                .filter(Predicates.not(ImportColumnDesc::isColumn))
+                .forEach(desc -> {
+                    final Expr expr = desc.getExpr();
+                    if (expr instanceof SlotRef) {
+                        final String columnName = ((SlotRef) 
expr).getColumnName();
+                        if (derivativeColumns.containsKey(columnName)) {
+                            desc.setExpr(derivativeColumns.get(columnName));
+                        }
+                    } else {
+                        recursiveRewrite(expr, derivativeColumns);
+                    }
+                    derivativeColumns.put(desc.getColumnName(), expr);
+                });
+        // `tmp` columns with expr can be removed after expr rewritten
+        columnDescList.removeIf(
+                Predicates.not(columnDesc ->
+                        columnDesc.isColumn() || 
Objects.nonNull(targetTable.getColumn(columnDesc.getColumnName()))
+                )
+        );
+        LOG.debug("rewrite result:{}", columnDescList);
+    }
+
+    private void recursiveRewrite(Expr expr, Map<String, Expr> 
derivativeColumns) {
+        final ArrayList<Expr> children = expr.getChildren();
+        if (CollectionUtils.isEmpty(children)) {
+            return;
+        }
+        for (int i = 0; i < children.size(); i++) {
+            Expr child = expr.getChild(i);
+            if (child instanceof SlotRef) {
+                final String columnName = ((SlotRef) child).getColumnName();
+                if (derivativeColumns.containsKey(columnName)) {
+                    expr.setChild(i, derivativeColumns.get(columnName));
+                }
+                continue;
+            }
+            recursiveRewrite(child, derivativeColumns);
+        }
+    }
+
+    /**
+     * record mapping from col name to csv-format-tvf-style col name
+     *
+     * @see selectColNameToCsvColName
+     */
+    private void recordCsvColNames(List<ImportColumnDesc> columnDescList) {
+        AtomicInteger counter = new AtomicInteger(1);
+        selectColNameToCsvColName = columnDescList.stream()
+                .filter(ImportColumnDesc::isColumn)
+                .collect(Collectors.toMap(
+                        ImportColumnDesc::getColumnName,
+                        name -> "c" + counter.getAndIncrement(),
+                        (v1, v2) -> v1,
+                        LinkedHashMap::new
+                ));
+        LOG.debug("select column name to csv colum name:{}", 
selectColNameToCsvColName);
+    }
+
+    private List<ImportColumnDesc> filterColumns(List<ImportColumnDesc> 
columnExprList) {
+        Preconditions.checkNotNull(targetTable, "target table is unset");
+
+        // remove all `tmp` columns, which are not in target table
+        columnExprList.removeIf(
+                Predicates.and(
+                        ImportColumnDesc::isColumn,
+                        desc -> 
Objects.isNull(targetTable.getColumn(desc.getColumnName())),
+                        desc -> 
functionGenTableColNames.contains(desc.getColumnName())
+                )
+        );
+
+        // deal with the case like:
+        // (k1, k2) SET(k1 = `upper(k1)`)
+        columnExprList = Lists.newArrayList(columnExprList.stream()
+                .collect(Collectors.toMap(
+                        ImportColumnDesc::getColumnName,
+                        Function.identity(),
+                        (lhs, rhs) -> {
+                            if (lhs.getExpr() != null && rhs.getExpr() == 
null) {
+                                return lhs;
+                            }
+                            if (lhs.getExpr() == null && rhs.getExpr() != 
null) {
+                                return rhs;
+                            }
+                            throw new IllegalArgumentException(
+                                    String.format("column `%s` specified 
twice", lhs.getColumnName()));
+                        }
+                )).values());
+
+        // deal with the case that column in target table but not in tvf
+        columnExprList.removeIf(desc ->
+                !functionGenTableColNames.contains(desc.getColumnName())
+                        && 
Objects.nonNull(targetTable.getColumn(desc.getColumnName()))
+                        && desc.isColumn()
+        );
+
+        LOG.debug("filtered result:{}", columnExprList);
+        return columnExprList;
+    }
+
+    private void resetTargetColumnNames(List<ImportColumnDesc> columnExprList) 
{
+        targetColumnNames = columnExprList
+                .stream()
+                .map(ImportColumnDesc::getColumnName)
+                .collect(Collectors.toList());
+        LOG.debug("target cols:{}", targetColumnNames);
+    }
+
+    private void resetSelectList(List<ImportColumnDesc> columnExprList) {
+        if (isCsvFormat) {
+            rewriteExprColNameToCsvStyle(columnExprList);
+        }
+        LOG.debug("select list:{}", columnExprList);
+        final SelectList selectList = new SelectList();
+        columnExprList.forEach(desc -> {
+            if (!desc.isColumn()) {
+                selectList.addItem(new SelectListItem(desc.getExpr(), 
desc.getColumnName()));
+                return;
+            }
+
+            if (isCsvFormat) {
+                // use csv-style-column name and target column name as alias
+                final Expr slotRef = new SlotRef(null, 
selectColNameToCsvColName.get(desc.getColumnName()));
+                selectList.addItem(new SelectListItem(slotRef, 
desc.getColumnName()));
+            } else {
+                selectList.addItem(new SelectListItem(new SlotRef(null, 
desc.getColumnName()), null));
+            }
+        });
+        ((SelectStmt) getQueryStmt()).resetSelectList(selectList);
+    }
+
+    private void rewriteExprColNameToCsvStyle(List<ImportColumnDesc> 
columnExprList) {
+        Preconditions.checkNotNull(selectColNameToCsvColName,
+                "SelectColName To CsvColName is not recorded");
+        columnExprList
+                .stream()
+                .filter(Predicates.not(ImportColumnDesc::isColumn))
+                .forEach(desc -> rewriteSlotRefInExpr(desc.getExpr()));
+
+        // rewrite where predicate and order by elements
+        final SelectStmt selectStmt = (SelectStmt) getQueryStmt();
+        rewriteSlotRefInExpr(selectStmt.getWhereClause());
+        selectStmt.getOrderByElements().forEach(orderByElement -> 
rewriteSlotRefInExpr(orderByElement.getExpr()));
+    }
+
+    private void rewriteSlotRefInExpr(Expr expr) {
+        final Predicate<? super Expr> rewritePredicate = e -> e instanceof 
SlotRef
+                && selectColNameToCsvColName.containsKey(((SlotRef) 
e).getColumnName());
+        final Consumer<? super Expr> rewriteOperation = e -> {
+            final SlotRef slotRef = (SlotRef) e;
+            
slotRef.setCol(selectColNameToCsvColName.get(slotRef.getColumnName()));
+        };
+
+        List<Expr> slotRefs = Lists.newArrayList();
+        expr.collect(rewritePredicate, slotRefs);
+        slotRefs.forEach(rewriteOperation);
+    }
+
+    // 
-----------------------------------------------------------------------------------------
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index 238ed47040..9720300ab2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -2653,4 +2653,9 @@ public class SelectStmt extends QueryStmt {
             return null;
         }
     }
+
+    public void resetSelectList(SelectList selectList) {
+        this.selectList = selectList;
+        this.originSelectList = selectList.clone();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java
index 8a58ee9523..351fa11688 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Preconditions;
@@ -43,7 +45,7 @@ public class UnifiedLoadStmt extends DdlStmt {
     }
 
     public static UnifiedLoadStmt buildMysqlLoadStmt(DataDescription 
dataDescription, Map<String, String> properties,
-                                                     String comment) {
+            String comment) {
         final ConnectContext connectContext = ConnectContext.get();
         if (connectContext != null && 
connectContext.getSessionVariable().isEnableUnifiedLoad()) {
             return new UnifiedLoadStmt(new MysqlLoadStmt(dataDescription, 
properties, comment));
@@ -52,10 +54,15 @@ public class UnifiedLoadStmt extends DdlStmt {
     }
 
     public static UnifiedLoadStmt buildBrokerLoadStmt(LabelName label, 
List<DataDescription> dataDescriptions,
-                                                      BrokerDesc brokerDesc,
-                                                      Map<String, String> 
properties, String comment) {
+            BrokerDesc brokerDesc,
+            Map<String, String> properties, String comment) throws 
DdlException {
+
         final ConnectContext connectContext = ConnectContext.get();
         if (connectContext != null && 
connectContext.getSessionVariable().isEnableUnifiedLoad()) {
+            if (brokerDesc != null && brokerDesc.getStorageType() == 
StorageType.S3) {
+                // for tvf solution validation
+                return new UnifiedLoadStmt(new S3TvfLoadStmt(label, 
dataDescriptions, brokerDesc, properties, comment));
+            }
             return new UnifiedLoadStmt(new BrokerLoadStmt(label, 
dataDescriptions, brokerDesc, properties, comment));
         }
         return new UnifiedLoadStmt(new LoadStmt(label, dataDescriptions, 
brokerDesc, properties, comment));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index c86da0daed..9cb5a7fe2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.AnalyzeStmt;
 import org.apache.doris.analysis.AnalyzeTblStmt;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.ArrayLiteral;
+import org.apache.doris.analysis.CreateRoutineLoadStmt;
 import org.apache.doris.analysis.CreateTableAsSelectStmt;
 import org.apache.doris.analysis.CreateTableLikeStmt;
 import org.apache.doris.analysis.DdlStmt;
@@ -903,10 +904,10 @@ public class StmtExecutor {
             unifiedLoadStmt.init();
             final StatementBase proxyStmt = unifiedLoadStmt.getProxyStmt();
             parsedStmt = proxyStmt;
-            if (!(proxyStmt instanceof LoadStmt)) {
+            if (!(proxyStmt instanceof LoadStmt) && !(proxyStmt instanceof 
CreateRoutineLoadStmt)) {
                 Preconditions.checkState(
-                        parsedStmt instanceof InsertStmt && ((InsertStmt) 
parsedStmt).needLoadManager(),
-                        "enable_unified_load=true, should be external insert 
stmt");
+                        parsedStmt instanceof InsertStmt,
+                        "enable_unified_load=true, should be insert stmt");
             }
         }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 994353af37..1bd07d0cff 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -85,9 +85,9 @@ public abstract class ExternalFileTableValuedFunction extends 
TableValuedFunctio
     public static final Logger LOG = 
LogManager.getLogger(ExternalFileTableValuedFunction.class);
     protected static final String DEFAULT_COLUMN_SEPARATOR = ",";
     protected static final String DEFAULT_LINE_DELIMITER = "\n";
-    protected static final String FORMAT = "format";
-    protected static final String COLUMN_SEPARATOR = "column_separator";
-    protected static final String LINE_DELIMITER = "line_delimiter";
+    public static final String FORMAT = "format";
+    public static final String COLUMN_SEPARATOR = "column_separator";
+    public static final String LINE_DELIMITER = "line_delimiter";
     protected static final String JSON_ROOT = "json_root";
     protected static final String JSON_PATHS = "jsonpaths";
     protected static final String STRIP_OUTER_ARRAY = "strip_outer_array";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
index 6f37fa5a5e..9b820fa185 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java
@@ -29,6 +29,7 @@ import 
org.apache.doris.datasource.property.constants.S3Properties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.thrift.TFileType;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
 import java.util.HashMap;
@@ -57,6 +58,7 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
 
     private static final ImmutableSet<String> PROPERTIES_SET = 
ImmutableSet.<String>builder()
             .add(S3_URI)
+            .add(S3Properties.ENDPOINT)
             .addAll(DEPRECATED_KEYS)
             .addAll(S3Properties.TVF_REQUIRED_FIELDS)
             .addAll(OPTIONAL_KEYS)
@@ -70,7 +72,9 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
         Map<String, String> tvfParams = getValidParams(params);
         forceVirtualHosted = isVirtualHosted(tvfParams);
         s3uri = getS3Uri(tvfParams);
-        String endpoint = getEndpointFromUri();
+        final String endpoint = forceVirtualHosted
+                ? getEndpointAndSetVirtualBucket(params)
+                : s3uri.getBucketScheme();
         CloudCredentialWithEndpoint credential = new 
CloudCredentialWithEndpoint(endpoint,
                 tvfParams.getOrDefault(S3Properties.REGION, 
S3Properties.getRegionOfEndpoint(endpoint)),
                 tvfParams.get(S3Properties.ACCESS_KEY),
@@ -112,21 +116,20 @@ public class S3TableValuedFunction extends 
ExternalFileTableValuedFunction {
         return S3Properties.requiredS3TVFProperties(validParams);
     }
 
-    private String getEndpointFromUri() throws AnalysisException {
-        if (forceVirtualHosted) {
-            // s3uri.getVirtualBucket() is: virtualBucket.endpoint, Eg:
+    private String getEndpointAndSetVirtualBucket(Map<String, String> params) 
throws AnalysisException {
+        Preconditions.checkState(forceVirtualHosted, "only invoked when force 
virtual hosted.");
+        String[] fileds = s3uri.getVirtualBucket().split("\\.", 2);
+        virtualBucket = fileds[0];
+        if (fileds.length > 1) {
+            // At this point, s3uri.getVirtualBucket() is: 
virtualBucket.endpoint, Eg:
             //          uri: 
http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt
             // s3uri.getVirtualBucket() = 
my_bucket.cos.ap-beijing.myqcloud.com,
             // so we need separate virtualBucket and endpoint.
-            String[] fileds = s3uri.getVirtualBucket().split("\\.", 2);
-            virtualBucket = fileds[0];
-            if (fileds.length > 1) {
-                return fileds[1];
-            } else {
-                throw new AnalysisException("can not parse endpoint, please 
check uri.");
-            }
+            return fileds[1];
+        } else if (params.containsKey(S3Properties.ENDPOINT)) {
+            return params.get(S3Properties.ENDPOINT);
         } else {
-            return s3uri.getBucketScheme();
+            throw new AnalysisException("can not parse endpoint, please check 
uri.");
         }
     }
 
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
new file mode 100644
index 0000000000..dd40ecb637
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.analysis.BinaryPredicate.Operator;
+import org.apache.doris.analysis.StorageBackend.StorageType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.datasource.property.constants.S3Properties;
+import org.apache.doris.datasource.property.constants.S3Properties.Env;
+import org.apache.doris.load.loadv2.LoadTask.MergeType;
+import org.apache.doris.tablefunction.S3TableValuedFunction;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import mockit.Expectations;
+import mockit.Injectable;
+import org.apache.hadoop.util.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class S3TvfLoadStmtTest {
+
+    private static final String ACCESS_KEY_VALUE = "ak";
+
+    private static final String SECRET_KEY_VALUE = "sk";
+
+    private static final String ENDPOINT_VALUE = "cos.ap-beijing.myqcloud.com";
+
+    private static final String REGION_VALUE = "ap-beijing";
+
+    private static final String DATA_URI = 
"s3://doris-build-1308700295/regression/load/data/part*";
+
+    private static final String FORMAT = "parquet";
+
+    private static final String TARGET_TABLE_NAME = "target";
+
+    private LabelName labelName;
+
+    private BrokerDesc brokerDesc;
+
+    private Set<String> colNames;
+
+    @Before
+    public void setUp() throws AnalysisException {
+        FeConstants.runningUnitTest = true;
+
+        labelName = new LabelName("testDb", "testTbl");
+
+        final Map<String, String> brokerProperties = Maps.newHashMap();
+        brokerProperties.put(Env.ACCESS_KEY, ACCESS_KEY_VALUE);
+        brokerProperties.put(Env.SECRET_KEY, SECRET_KEY_VALUE);
+        brokerProperties.put(Env.ENDPOINT, ENDPOINT_VALUE);
+        brokerProperties.put(Env.REGION, REGION_VALUE);
+        brokerDesc = new BrokerDesc("s3", StorageType.S3, brokerProperties);
+
+        colNames = Sets.newHashSet("k1", "k2", "k3", "k4");
+    }
+
+    @Test
+    public void testClauses() throws UserException {
+        final BinaryPredicate greater = new BinaryPredicate(Operator.GT, new 
IntLiteral(1), new IntLiteral(0));
+        final BinaryPredicate less = new BinaryPredicate(Operator.LT, new 
IntLiteral(1), new IntLiteral(0));
+        DataDescription dataDescription = buildDataDesc(
+                Lists.newArrayList(colNames),
+                greater,
+                less,
+                null
+        );
+        final S3TvfLoadStmt s3TvfLoadStmt = new S3TvfLoadStmt(labelName, 
Collections.singletonList(dataDescription),
+                brokerDesc,
+                Maps.newHashMap(), "comment");
+        final SelectStmt selectStmt = (SelectStmt) 
s3TvfLoadStmt.getQueryStmt();
+        final Expr whereClause = Deencapsulation.getField(selectStmt, 
"whereClause");
+        Assert.assertEquals(whereClause, new 
CompoundPredicate(CompoundPredicate.Operator.AND, greater, less));
+    }
+
+    @Test
+    public void testTvfGeneration() {
+        DataDescription dataDescription = buildDataDesc(
+                Lists.newArrayList(colNames),
+                null,
+                null,
+                null
+        );
+        final TableRef tvfRef = Deencapsulation.invoke(S3TvfLoadStmt.class,
+                "buildTvfRef",
+                dataDescription, brokerDesc);
+        Assert.assertTrue(tvfRef instanceof TableValuedFunctionRef);
+        final S3TableValuedFunction tableFunction
+                = (S3TableValuedFunction) ((TableValuedFunctionRef) 
tvfRef).getTableFunction();
+        final Map<String, String> locationProperties = 
tableFunction.getLocationProperties();
+        Assert.assertEquals(locationProperties.get(S3Properties.ENDPOINT), 
ENDPOINT_VALUE);
+        Assert.assertEquals(locationProperties.get(S3Properties.ACCESS_KEY), 
ACCESS_KEY_VALUE);
+        Assert.assertEquals(locationProperties.get(S3Properties.SECRET_KEY), 
SECRET_KEY_VALUE);
+        Assert.assertEquals(locationProperties.get(S3Properties.REGION), 
REGION_VALUE);
+        Assert.assertEquals(tableFunction.getFilePath(), DATA_URI);
+    }
+
+    @Injectable
+    Table targetTable;
+
+    @Injectable
+    DataDescription dataDescription;
+
+    @Test
+    public void testColumnMappings() throws Exception {
+        // c1/c2/c3 in both file and table, and c5 is only in table
+        final List<ImportColumnDesc> columnsDescList = getColumnsDescList(
+                "c1,c2,c3,c1=upper(c1), tmp_c4=c1 + 1, c5 = tmp_c4+1");
+        //        DataDescription dataDescription = buildDataDesc(colNames, 
null, null, null);
+        new Expectations() {
+            {
+                dataDescription.getParsedColumnExprList();
+                minTimes = 0;
+                result = columnsDescList;
+
+                dataDescription.getFilePaths();
+                minTimes = 0;
+                result = Collections.singletonList(DATA_URI);
+
+                targetTable.getBaseSchema();
+                minTimes = 0;
+                result = getBaseSchema();
+
+                targetTable.getColumn("c1");
+                minTimes = 0;
+                result = new Column();
+
+                targetTable.getColumn("c2");
+                minTimes = 0;
+                result = new Column();
+
+                targetTable.getColumn("c3");
+                minTimes = 0;
+                result = new Column();
+
+                targetTable.getColumn("c5");
+                minTimes = 0;
+                result = new Column();
+
+                targetTable.getColumn("tmp_c4");
+                minTimes = 0;
+                result = null;
+            }
+        };
+        final S3TvfLoadStmt s3TvfLoadStmt = new S3TvfLoadStmt(labelName, 
Collections.singletonList(dataDescription),
+                brokerDesc, null, "comment");
+        s3TvfLoadStmt.setTargetTable(targetTable);
+        Deencapsulation.setField(s3TvfLoadStmt, "functionGenTableColNames", 
Sets.newHashSet("c1", "c2", "c3"));
+
+        Deencapsulation.invoke(s3TvfLoadStmt, "rewriteExpr", columnsDescList);
+        Assert.assertEquals(columnsDescList.size(), 5);
+        final String orig4 = "upper(`c1`) + 1 + 1";
+        Assert.assertEquals(orig4, 
columnsDescList.get(4).getExpr().toString());
+
+        final List<ImportColumnDesc> filterColumns = 
Deencapsulation.invoke(s3TvfLoadStmt,
+                "filterColumns", columnsDescList);
+        Assert.assertEquals(filterColumns.size(), 4);
+    }
+
+    private static DataDescription buildDataDesc(Iterable<String> columns, 
Expr fileFilter, Expr wherePredicate,
+            List<Expr> mappingList) {
+
+        return new DataDescription(
+                TARGET_TABLE_NAME,
+                null,
+                Collections.singletonList(DATA_URI),
+                Lists.newArrayList(columns),
+                null,
+                FORMAT,
+                null,
+                false,
+                mappingList,
+                fileFilter,
+                wherePredicate,
+                MergeType.APPEND,
+                null,
+                null,
+                null
+        );
+    }
+
+    private static List<ImportColumnDesc> getColumnsDescList(String columns) 
throws Exception {
+        String columnsSQL = "COLUMNS (" + columns + ")";
+        return ((ImportColumnsStmt) SqlParserUtils.getFirstStmt(
+                new SqlParser(new SqlScanner(new 
StringReader(columnsSQL))))).getColumns();
+    }
+
+    private static List<Column> getBaseSchema() {
+        List<Column> columns = com.google.common.collect.Lists.newArrayList();
+
+        Column c1 = new Column("c1", PrimitiveType.BIGINT);
+        c1.setIsKey(true);
+        c1.setIsAllowNull(false);
+        columns.add(c1);
+
+        Column c2 = new Column("c2", ScalarType.createVarchar(25));
+        c2.setIsKey(true);
+        c2.setIsAllowNull(true);
+        columns.add(c2);
+
+        Column c3 = new Column("c3", PrimitiveType.BIGINT);
+        c3.setIsKey(true);
+        c3.setIsAllowNull(false);
+        columns.add(c3);
+
+        Column c5 = new Column("c5", PrimitiveType.BIGINT);
+        c5.setIsKey(true);
+        c5.setIsAllowNull(true);
+        columns.add(c5);
+
+        return columns;
+    }
+
+}
diff --git a/regression-test/suites/load_p2/broker_load/test_broker_load.groovy 
b/regression-test/suites/load_p2/broker_load/test_broker_load.groovy
index 21cfd03cdb..d9b4cfea37 100644
--- a/regression-test/suites/load_p2/broker_load/test_broker_load.groovy
+++ b/regression-test/suites/load_p2/broker_load/test_broker_load.groovy
@@ -350,68 +350,5 @@ suite("test_broker_load_p2", "p2") {
             }
         }
     }
-
-    // test unified load
-    if (enabled != null && enabled.equalsIgnoreCase("true")) {
-        sql """ set enable_unified_load=true; """
-        def uuids = []
-        try {
-            def i = 0
-            for (String table in tables) {
-                sql new 
File("""${context.file.parent}/ddl/${table}_drop.sql""").text
-                sql new 
File("""${context.file.parent}/ddl/${table}_create.sql""").text
-
-                def uuid = UUID.randomUUID().toString().replace("-", "0")
-                uuids.add(uuid)
-                do_load_job.call(uuid, paths[i], table, columns_list[i], 
column_in_paths[i], preceding_filters[i],
-                        set_values[i], where_exprs[i])
-                i++
-            }
-
-            i = 0
-            for (String label in uuids) {
-                def max_try_milli_secs = 600000
-                while (max_try_milli_secs > 0) {
-                    String[][] result = sql """ show load where label="$label" 
order by createtime desc limit 1; """
-                    if (result[0][2].equals("FINISHED")) {
-                        logger.info("Load FINISHED " + label)
-                        assertTrue(etl_info[i] == result[0][5], "expected: " + 
etl_info[i] + ", actual: " + result[0][5] + ", label: $label")
-                        break;
-                    }
-                    if (result[0][2].equals("CANCELLED")) {
-                        assertTrue(result[0][7].contains(error_msg[i]))
-                        break;
-                    }
-                    Thread.sleep(1000)
-                    max_try_milli_secs -= 1000
-                    if(max_try_milli_secs <= 0) {
-                        assertTrue(1 == 2, "load Timeout: $label")
-                    }
-                }
-                i++
-            }
-
-            def orc_expect_result = """[[20, 15901, 6025915247311731176, 
1373910657, 8863282788606566657], [38, 15901, -9154375582268094750, 1373853561, 
4923892366467329038], [38, 15901, -9154375582268094750, 1373853561, 
8447995939656287502], [38, 15901, -9154375582268094750, 1373853565, 
7451966001310881759], [38, 15901, -9154375582268094750, 1373853565, 
7746521994248163870], [38, 15901, -9154375582268094750, 1373853577, 
6795654975682437824], [38, 15901, -9154375582268094750, 1373853577, [...]
-            for (String table in tables) {
-                if (table.matches("orc_s3_case[23456789]")) {
-                    String[][] orc_actual_result = sql """select CounterID, 
EventDate, UserID, EventTime, WatchID from $table order by CounterID, 
EventDate, UserID, EventTime, WatchID limit 10;"""
-                    assertTrue("$orc_actual_result" == "$orc_expect_result")
-                }
-            }
-
-            order_qt_parquet_s3_case1 """select count(*) from parquet_s3_case1 
where col1=10"""
-            order_qt_parquet_s3_case3 """select count(*) from parquet_s3_case3 
where p_partkey < 100000"""
-            order_qt_parquet_s3_case6 """select count(*) from parquet_s3_case6 
where p_partkey < 100000"""
-            order_qt_parquet_s3_case7 """select count(*) from parquet_s3_case7 
where col4=4"""
-            order_qt_parquet_s3_case8 """ select count(*) from 
parquet_s3_case8 where p_partkey=1"""
-            order_qt_parquet_s3_case9 """ select * from parquet_s3_case9"""
-
-        } finally {
-            for (String table in tables) {
-                sql new 
File("""${context.file.parent}/ddl/${table}_drop.sql""").text
-            }
-            sql """ set enable_unified_load=false; """
-        }
-    }
 }
 
diff --git 
a/regression-test/suites/load_p2/broker_load/test_tvf_based_broker_load.groovy 
b/regression-test/suites/load_p2/broker_load/test_tvf_based_broker_load.groovy
new file mode 100644
index 0000000000..964e2c926a
--- /dev/null
+++ 
b/regression-test/suites/load_p2/broker_load/test_tvf_based_broker_load.groovy
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_tvf_based_broker_load_p2", "p2") {
+
+    def tables = ["part",
+                  "upper_case",
+                  "reverse",
+                  "set1",
+                  "set2",
+                  "set3",
+                  "set4",
+                  "set5",
+                  "set6",
+                  "set7",
+                  "null_default",
+                  "filter",
+                  //  "path_column",
+                  "parquet_s3_case1", // col1 not in file but in table, will 
load default value for it.
+                  "parquet_s3_case2", // x1 not in file, not in table, will 
throw "col not found" error.
+                  "parquet_s3_case3", // p_comment not in table but in file, 
load normally.
+                  "parquet_s3_case4", // all columns are in table but not in 
file, will fill default values.
+                  "parquet_s3_case5", // x1 not in file, not in table, will 
throw "col not found" error.
+                  "parquet_s3_case6", // normal
+                  "parquet_s3_case7", // col5 will be ignored, load normally
+                  "parquet_s3_case8", // first column in table is not 
specified, will load default value for it.
+                  "parquet_s3_case9", // first column in table is not 
specified, will load default value for it.
+                  "orc_s3_case1", // table column capitalize firsrt
+                  "orc_s3_case2", // table column lowercase * load column 
lowercase * orc file lowercase
+                  "orc_s3_case3", // table column lowercase * load column 
uppercase * orc file lowercase
+                  "orc_s3_case4", // table column lowercase * load column 
lowercase * orc file uppercase
+                  "orc_s3_case5", // table column lowercase * load column 
uppercase * orc file uppercase
+                  "orc_s3_case6", // table column uppercase * load column 
uppercase * orc file lowercase
+                  "orc_s3_case7", // table column uppercase * load column 
lowercase * orc file lowercase
+                  "orc_s3_case8", // table column uppercase * load column 
uppercase * orc file uppercase
+                  "orc_s3_case9", // table column uppercase * load column 
lowercase * orc file uppercase
+    ]
+    def paths = ["s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 //  
"s3://doris-build-1308700295/regression/load/data/path/*/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 "s3://doris-build-1308700295/regression/load/data/part*",
+                 
"s3://doris-build-1308700295/regression/load/data/random_all_types/part*",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_100k_rows.orc",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_uppercase.orc",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_uppercase.orc",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_uppercase.orc",
+                 
"s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_uppercase.orc",
+    ]
+    def columns_list = ["""p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment""",
+                        """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment""",
+                        """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment""",
+                        """p_partkey, p_name, p_size""",
+                        """p_partkey""",
+                        """p_partkey""",
+                        """p_partkey,  p_size""",
+                        """p_partkey""",
+                        """p_partkey,  p_size""",
+                        """p_partkey,  p_size""",
+                        """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment""",
+                        """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment""",
+                        //  """p_partkey, p_name, p_mfgr, p_brand, p_type, 
p_size, p_container, p_retailprice, p_comment""",
+                        """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment, col1""",
+                        """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment, x1""",
+                        """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, 
p_container, p_retailprice, p_comment""",
+                        """col1, col2, col3, col4""",
+                        """p_partkey, p_name, p_mfgr, x1""",
+                        """p_partkey, p_name, p_mfgr, p_brand""",
+                        """p_partkey, p_name, p_mfgr, p_brand""",
+                        """p_name, p_mfgr""",
+                        """""",
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        //TODO: comment blow 8 rows after jibing fix
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng
 [...]
+                        //TODO: uncomment blow 8 rows after jibing fix
+                        //    
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,
 [...]
+                        //    
"""WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE,
 [...]
+                        //    
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,
 [...]
+                        //    
"""WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE,
 [...]
+                        //    
"""WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE,
 [...]
+                        //    
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,
 [...]
+                        //    
"""WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE,
 [...]
+                        //    
"""watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,
 [...]
+    ]
+    def column_in_paths = ["", "", "", "", "", "", "", "", "", "", "", ""/*, 
"COLUMNS FROM PATH AS (city)"*/, "", "", "", "", "", "", "", "", "", "", "", 
"", "", "", "", "", "", ""]
+    def preceding_filters = ["", "", "", "", "", "", "", "", "", "", "", 
"preceding filter p_size < 10"/*, ""*/, "", "", "", "", "", "", "", "", "", "", 
"", "", "", "", "", "", "", ""]
+    def set_values = ["",
+                      "",
+                      "SET(comment=p_comment, retailprice=p_retailprice, 
container=p_container, size=p_size, type=p_type, brand=p_brand, mfgr=p_mfgr, 
name=p_name, partkey=p_partkey)",
+                      
"set(p_name=upper(p_name),p_greatest=greatest(cast(p_partkey as int), 
cast(p_size as int)))",
+                      "set(p_partkey = p_partkey + 100)",
+                      "set(partkey = p_partkey + 100)",
+                      "set(partkey = p_partkey + p_size)",
+                      "set(tmpk = p_partkey + 1, partkey = tmpk*2)",
+                      "set(partkey = p_partkey + 1, partsize = p_size*2)",
+                      "set(partsize = p_partkey + p_size)",
+                      "",
+                      "",
+                      // "",
+                      "",
+                      "",
+                      "",
+                      "",
+                      "set(col4 = x1)",
+                      "set(col4 = p_brand)",
+                      "set(col5 = p_brand)",
+                      "",
+                      "",
+                      "",
+                      "",
+                      "",
+                      "",
+                      "",
+                      "",
+                      "",
+                      "",
+                      ""
+    ]
+    def where_exprs = ["", "", "", "", "", "", "", "", "", "", "", "where 
p_partkey>10"/*, ""*/, "", "", "", "", "", "", "", "", "", "", "", "", "", "", 
"", "", "", ""]
+
+    def etl_info = ["unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=163706; dpp.abnorm.ALL=0; 
dpp.norm.ALL=36294",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "\\N",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "\\N",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=4096",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=100000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
+                    "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000",
+    ]
+
+    def task_info = ["cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+                     "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; 
max_filter_ratio:0.0",
+    ]
+
+    def error_msg = ["",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "[INTERNAL_ERROR]failed to find default value expr for 
slot: x1",
+                     "",
+                     "",
+                     "[INTERNAL_ERROR]failed to find default value expr for 
slot: x1",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+                     "",
+    ]
+
+    String ak = getS3AK()
+    String sk = getS3SK()
+    String enabled = context.config.otherConfigs.get("enableBrokerLoad")
+
+    def do_load_job = { uuid, path, table, columns, column_in_path, 
preceding_filter,
+                        set_value, where_expr ->
+        String columns_str = ("$columns" != "") ? "($columns)" : "";
+        String format_str = table.startsWith("orc_s3_case") ? "ORC" : "PARQUET"
+        sql """
+            LOAD LABEL $uuid (
+                DATA INFILE("$path")
+                INTO TABLE $table
+                FORMAT AS $format_str
+                $columns_str
+                $column_in_path
+                $preceding_filter
+                $set_value
+                $where_expr
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "$ak",
+                "AWS_SECRET_KEY" = "$sk",
+                "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+                "AWS_REGION" = "ap-beijing"
+            )
+            """
+        logger.info("Submit load with lable: $uuid, table: $table, path: 
$path")
+    }
+
+    // test unified load
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        sql """ set enable_unified_load=true; """
+        def uuids = []
+        try {
+            def i = 0
+            for (String table in tables) {
+                sql new 
File("""${context.file.parent}/ddl/${table}_drop.sql""").text
+                sql new 
File("""${context.file.parent}/ddl/${table}_create.sql""").text
+
+                def uuid = UUID.randomUUID().toString().replace("-", "0")
+                uuids.add(uuid)
+                do_load_job.call(uuid, paths[i], table, columns_list[i], 
column_in_paths[i], preceding_filters[i],
+                        set_values[i], where_exprs[i])
+                i++
+            }
+
+            def orc_expect_result = """[[20, 15901, 6025915247311731176, 
1373910657, 8863282788606566657], [38, 15901, -9154375582268094750, 1373853561, 
4923892366467329038], [38, 15901, -9154375582268094750, 1373853561, 
8447995939656287502], [38, 15901, -9154375582268094750, 1373853565, 
7451966001310881759], [38, 15901, -9154375582268094750, 1373853565, 
7746521994248163870], [38, 15901, -9154375582268094750, 1373853577, 
6795654975682437824], [38, 15901, -9154375582268094750, 1373853577, [...]
+            for (String table in tables) {
+                if (table.matches("orc_s3_case[23456789]")) {
+                    String[][] orc_actual_result = sql """select CounterID, 
EventDate, UserID, EventTime, WatchID from $table order by CounterID, 
EventDate, UserID, EventTime, WatchID limit 10;"""
+                    assertTrue("$orc_actual_result" == "$orc_expect_result")
+                }
+            }
+
+            order_qt_parquet_s3_case1 """select count(*) from parquet_s3_case1 
where col1=10"""
+            order_qt_parquet_s3_case3 """select count(*) from parquet_s3_case3 
where p_partkey < 100000"""
+            order_qt_parquet_s3_case6 """select count(*) from parquet_s3_case6 
where p_partkey < 100000"""
+            order_qt_parquet_s3_case7 """select count(*) from parquet_s3_case7 
where col4=4"""
+            order_qt_parquet_s3_case8 """ select count(*) from 
parquet_s3_case8 where p_partkey=1"""
+            order_qt_parquet_s3_case9 """ select * from parquet_s3_case9"""
+
+        } finally {
+            for (String table in tables) {
+                sql new 
File("""${context.file.parent}/ddl/${table}_drop.sql""").text
+            }
+            sql """ set enable_unified_load=false; """
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to