This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 2a14fca2e7e35e1ddfc0b2ecc1d099abe6b678b3 Author: Siyang Tang <[email protected]> AuthorDate: Sun Jun 25 17:45:31 2023 +0800 [feature](load-refactor-with-tvf) S3 load with S3 tvf and native insert (#19937) --- fe/fe-core/pom.xml | 8 +- .../org/apache/doris/analysis/DataDescription.java | 10 + .../apache/doris/analysis/ImportColumnDesc.java | 4 + .../apache/doris/analysis/NativeInsertStmt.java | 36 +- .../org/apache/doris/analysis/S3TvfLoadStmt.java | 423 +++++++++++++++++++++ .../java/org/apache/doris/analysis/SelectStmt.java | 5 + .../org/apache/doris/analysis/UnifiedLoadStmt.java | 13 +- .../java/org/apache/doris/qe/StmtExecutor.java | 7 +- .../ExternalFileTableValuedFunction.java | 6 +- .../doris/tablefunction/S3TableValuedFunction.java | 27 +- .../apache/doris/analysis/S3TvfLoadStmtTest.java | 245 ++++++++++++ .../load_p2/broker_load/test_broker_load.groovy | 63 --- .../broker_load/test_tvf_based_broker_load.groovy | 328 ++++++++++++++++ 13 files changed, 1075 insertions(+), 100 deletions(-) diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index d2d933c08a..3d4e64b087 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -495,12 +495,12 @@ under the License. <artifactId>grpc-stub</artifactId> </dependency> <dependency> - <groupId>org.apache.doris</groupId> - <artifactId>hive-catalog-shade</artifactId> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> </dependency> <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> + <groupId>org.apache.doris</groupId> + <artifactId>hive-catalog-shade</artifactId> </dependency> <dependency> <groupId>org.apache.velocity</groupId> diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 04f5042d72..658605abc5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -154,6 +154,8 @@ public class DataDescription implements InsertStmt.DataDesc { private boolean isMysqlLoad = false; private int skipLines = 0; + private boolean isAnalyzed = false; + public DataDescription(String tableName, PartitionNames partitionNames, List<String> filePaths, @@ -572,6 +574,10 @@ public class DataDescription implements InsertStmt.DataDesc { return columnSeparator.getSeparator(); } + public Separator getColumnSeparatorObj() { + return columnSeparator; + } + public boolean isNegative() { return isNegative; } @@ -1001,6 +1007,9 @@ public class DataDescription implements InsertStmt.DataDesc { } public void analyze(String fullDbName) throws AnalysisException { + if (isAnalyzed) { + return; + } if (mergeType != LoadTask.MergeType.MERGE && deleteCondition != null) { throw new AnalysisException("not support DELETE ON clause when merge type is not MERGE."); } @@ -1015,6 +1024,7 @@ public class DataDescription implements InsertStmt.DataDesc { if (isNegative && mergeType != LoadTask.MergeType.APPEND) { throw new AnalysisException("Negative is only used when merge type is append."); } + isAnalyzed = true; } public void analyzeWithoutCheckPriv(String fullDbName) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java index 5b93d31b9f..13040bdcb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ImportColumnDesc.java @@ -50,6 +50,10 @@ public class ImportColumnDesc { return columnName; } + public void setColumnName(String columnName) { + this.columnName = columnName; + } + public Expr getExpr() { return expr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index d3956de245..fcd85fd80b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -98,11 +98,11 @@ public class NativeInsertStmt extends InsertStmt { private static final String SHUFFLE_HINT = "SHUFFLE"; private static final String NOSHUFFLE_HINT = "NOSHUFFLE"; - private final TableName tblName; + protected final TableName tblName; private final PartitionNames targetPartitionNames; // parsed from targetPartitionNames. private List<Long> targetPartitionIds; - private final List<String> targetColumnNames; + protected List<String> targetColumnNames; private QueryStmt queryStmt; private final List<String> planHints; private Boolean isRepartition; @@ -113,7 +113,7 @@ public class NativeInsertStmt extends InsertStmt { private final Map<String, Expr> exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - private Table targetTable; + protected Table targetTable; private DatabaseIf db; private long transactionId; @@ -254,8 +254,7 @@ public class NativeInsertStmt extends InsertStmt { return isTransactionBegin; } - @Override - public void analyze(Analyzer analyzer) throws UserException { + protected void preCheckAnalyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); if (targetTable == null) { @@ -279,6 +278,20 @@ public class NativeInsertStmt extends InsertStmt { if (targetPartitionNames != null) { targetPartitionNames.analyze(analyzer); } + } + + /** + * translate load related stmt to`insert into xx select xx from tvf` semantic + */ + protected void convertSemantic(Analyzer analyzer) throws UserException { + // do nothing + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + preCheckAnalyze(analyzer); + + convertSemantic(analyzer); // set target table and analyzeTargetTable(analyzer); @@ -321,8 +334,7 @@ public class NativeInsertStmt extends InsertStmt { } } - private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { - // Get table + protected void initTargetTable(Analyzer analyzer) throws AnalysisException { if (targetTable == null) { DatabaseIf db = analyzer.getEnv().getCatalogMgr() .getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); @@ -335,6 +347,11 @@ public class NativeInsertStmt extends InsertStmt { throw new AnalysisException("Not support insert target table."); } } + } + + private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { + // Get table + initTargetTable(analyzer); if (targetTable instanceof OlapTable) { OlapTable olapTable = (OlapTable) targetTable; @@ -889,9 +906,4 @@ public class NativeInsertStmt extends InsertStmt { return RedirectStatus.FORWARD_WITH_SYNC; } } - - @Override - public String toSql() { - return null; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java new file mode 100644 index 0000000000..7c5eec7729 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/S3TvfLoadStmt.java @@ -0,0 +1,423 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.analysis.CompoundPredicate.Operator; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.datasource.property.constants.S3Properties.Env; +import org.apache.doris.tablefunction.ExternalFileTableValuedFunction; +import org.apache.doris.tablefunction.S3TableValuedFunction; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * S3 Load based on S3 TVF + */ +public class S3TvfLoadStmt extends NativeInsertStmt { + + private static final Logger LOG = LogManager.getLogger(S3TvfLoadStmt.class); + + private static final String FORMAT_CSV = "csv"; + + private static final String DEFAULT_FORMAT = FORMAT_CSV; + + private final DataDescription dataDescription; + + /** + * for csv format, we need some particular process + */ + private final boolean isCsvFormat; + + /** + * only used for loading from csv format tvf + * with mapping from col name to csv-format-tvf-style col name (c1, c2, c3...) + */ + private Map<String, String> selectColNameToCsvColName; + + @VisibleForTesting + private Set<String> functionGenTableColNames; + + public S3TvfLoadStmt(LabelName label, List<DataDescription> dataDescList, BrokerDesc brokerDesc, + Map<String, String> properties, String comments) throws DdlException { + super(buildInsertTarget(dataDescList.get(0)), + label.getLabelName(), /*insert all columns by default*/null, + buildInsertSource(dataDescList.get(0), brokerDesc), null); + this.label = label; + this.dataDescription = dataDescList.get(0); + this.properties = properties; + this.comments = comments; + this.isCsvFormat = isCsvFormat(dataDescription.getFileFormat()); + } + + // ------------------------------------ init helpers ------------------------------------ + + private static InsertTarget buildInsertTarget(DataDescription dataDescription) { + final TableName tableName = new TableName(null, null, dataDescription.getTableName()); + return new InsertTarget(tableName, dataDescription.getPartitionNames()); + } + + private static InsertSource buildInsertSource(DataDescription dataDescription, BrokerDesc brokerDesc) + throws DdlException { + final SelectList selectList = new SelectList(); + // use `select *` by default + final SelectListItem item = new SelectListItem(SelectListItem.createStarItem(null)); + selectList.addItem(item); + + // build from + final FromClause fromClause = new FromClause( + Collections.singletonList(buildTvfRef(dataDescription, brokerDesc)) + ); + + // trans order by in load stmt + final String sequenceCol = dataDescription.getSequenceCol(); + final ArrayList<OrderByElement> orderByElementList = Lists.newArrayList(); + if (!Strings.isNullOrEmpty(sequenceCol)) { + final OrderByElement orderByElement = new OrderByElement( + new SlotRef(null, sequenceCol), + true, null + ); + orderByElementList.add(orderByElement); + } + + + // merge preceding filter and where expr + final BoolLiteral trueLiteral = new BoolLiteral(true); + final Expr whereExpr = Optional.ofNullable(dataDescription.getWhereExpr()).orElse(trueLiteral); + final Expr precdingFilterExpr = + Optional.ofNullable(dataDescription.getPrecdingFilterExpr()).orElse(trueLiteral); + final Expr compoundPredicate = new CompoundPredicate(Operator.AND, precdingFilterExpr, whereExpr); + + final SelectStmt selectStmt = new SelectStmt( + selectList, fromClause, compoundPredicate, + null, null, + orderByElementList, LimitElement.NO_LIMIT + ); + return new InsertSource(selectStmt); + } + + private static TableRef buildTvfRef(DataDescription dataDescription, BrokerDesc brokerDesc) throws DdlException { + final Map<String, String> params = Maps.newHashMap(); + + final List<String> filePaths = dataDescription.getFilePaths(); + Preconditions.checkState(filePaths.size() == 1, "there should be only one file path"); + final String s3FilePath = filePaths.get(0); + params.put(S3TableValuedFunction.S3_URI, s3FilePath); + + final Map<String, String> dataDescProp = dataDescription.getProperties(); + if (dataDescProp != null) { + params.putAll(dataDescProp); + } + + final String format = Optional.ofNullable(dataDescription.getFileFormat()).orElse(DEFAULT_FORMAT); + params.put(ExternalFileTableValuedFunction.FORMAT, format); + if (isCsvFormat(format)) { + final Separator separator = dataDescription.getColumnSeparatorObj(); + if (separator != null) { + try { + separator.analyze(); + } catch (AnalysisException e) { + throw new DdlException("failed to create s3 tvf ref", e); + } + params.put(ExternalFileTableValuedFunction.COLUMN_SEPARATOR, dataDescription.getColumnSeparator()); + } + } + + Preconditions.checkState(!brokerDesc.isMultiLoadBroker(), "do not support multi broker load currently"); + Preconditions.checkState(brokerDesc.getStorageType() == StorageType.S3, "only support S3 load"); + + final Map<String, String> s3ResourceProp = brokerDesc.getProperties(); + S3Properties.convertToStdProperties(s3ResourceProp); + s3ResourceProp.keySet().removeIf(Env.FS_KEYS::contains); + params.putAll(s3ResourceProp); + + try { + return new TableValuedFunctionRef(S3TableValuedFunction.NAME, null, params); + } catch (AnalysisException e) { + throw new DdlException("failed to create s3 tvf ref", e); + } + } + + private static boolean isCsvFormat(String format) { + return Strings.isNullOrEmpty(format) || StringUtils.equalsIgnoreCase(format, FORMAT_CSV); + } + + // -------------------------------------------------------------------------------------- + + @Override + public void convertSemantic(Analyzer analyzer) throws UserException { + label.analyze(analyzer); + initTargetTable(analyzer); + analyzeColumns(analyzer); + } + + @Override + public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, Set<String> parentViewNameSet) + throws AnalysisException { + + super.getTables(analyzer, tableMap, parentViewNameSet); + final List<TableIf> tables = Lists.newArrayList(tableMap.values()); + Preconditions.checkState(tables.size() == 2, + "table map should only contain the unique function generated table and the unique target table"); + functionGenTableColNames = tables.get(0) + .getBaseSchema() + .stream() + .map(Column::getName) + .collect(Collectors.toSet()); + } + + // ------------------------------------ columns mapping ------------------------------------ + + private void analyzeColumns(Analyzer analyzer) throws AnalysisException { + final String fullDbName = dataDescription.analyzeFullDbName(label.getDbName(), analyzer); + dataDescription.analyze(fullDbName); + // copy a list for analyzing + List<ImportColumnDesc> columnExprList = Lists.newArrayList(dataDescription.getParsedColumnExprList()); + if (CollectionUtils.isEmpty(columnExprList)) { + padExprListIfNeeded(columnExprList); + } + rewriteExpr(columnExprList); + boolean isFileFieldSpecified = columnExprList.stream().anyMatch(ImportColumnDesc::isColumn); + if (!isFileFieldSpecified) { + return; + } + if (isCsvFormat) { + // in tvf, csv format column names are like "c1, c2, c3", record for correctness of select list + recordCsvColNames(columnExprList); + } + columnExprList = filterColumns(columnExprList); + if (CollectionUtils.isEmpty(columnExprList)) { + return; + } + resetTargetColumnNames(columnExprList); + resetSelectList(columnExprList); + } + + /** + * deal with the case that not all columns in table are in file + */ + private void padExprListIfNeeded(List<ImportColumnDesc> columnExprList) { + if (isCsvFormat) { + return; + } + columnExprList.addAll( + functionGenTableColNames + .stream() + .map(ImportColumnDesc::new) + .collect(Collectors.toList()) + ); + } + + /** + * find and rewrite the derivative columns + * e.g. (v1,v2=v1+1,v3=v2+1) --> (v1, v2=v1+1, v3=v1+1+1) + */ + private void rewriteExpr(List<ImportColumnDesc> columnDescList) { + Preconditions.checkNotNull(columnDescList, "columns should be not null"); + Preconditions.checkNotNull(targetTable, "target table is unset"); + LOG.debug("original columnExpr:{}", columnDescList); + Map<String, Expr> derivativeColumns = Maps.newHashMap(); + columnDescList + .stream() + .filter(Predicates.not(ImportColumnDesc::isColumn)) + .forEach(desc -> { + final Expr expr = desc.getExpr(); + if (expr instanceof SlotRef) { + final String columnName = ((SlotRef) expr).getColumnName(); + if (derivativeColumns.containsKey(columnName)) { + desc.setExpr(derivativeColumns.get(columnName)); + } + } else { + recursiveRewrite(expr, derivativeColumns); + } + derivativeColumns.put(desc.getColumnName(), expr); + }); + // `tmp` columns with expr can be removed after expr rewritten + columnDescList.removeIf( + Predicates.not(columnDesc -> + columnDesc.isColumn() || Objects.nonNull(targetTable.getColumn(columnDesc.getColumnName())) + ) + ); + LOG.debug("rewrite result:{}", columnDescList); + } + + private void recursiveRewrite(Expr expr, Map<String, Expr> derivativeColumns) { + final ArrayList<Expr> children = expr.getChildren(); + if (CollectionUtils.isEmpty(children)) { + return; + } + for (int i = 0; i < children.size(); i++) { + Expr child = expr.getChild(i); + if (child instanceof SlotRef) { + final String columnName = ((SlotRef) child).getColumnName(); + if (derivativeColumns.containsKey(columnName)) { + expr.setChild(i, derivativeColumns.get(columnName)); + } + continue; + } + recursiveRewrite(child, derivativeColumns); + } + } + + /** + * record mapping from col name to csv-format-tvf-style col name + * + * @see selectColNameToCsvColName + */ + private void recordCsvColNames(List<ImportColumnDesc> columnDescList) { + AtomicInteger counter = new AtomicInteger(1); + selectColNameToCsvColName = columnDescList.stream() + .filter(ImportColumnDesc::isColumn) + .collect(Collectors.toMap( + ImportColumnDesc::getColumnName, + name -> "c" + counter.getAndIncrement(), + (v1, v2) -> v1, + LinkedHashMap::new + )); + LOG.debug("select column name to csv colum name:{}", selectColNameToCsvColName); + } + + private List<ImportColumnDesc> filterColumns(List<ImportColumnDesc> columnExprList) { + Preconditions.checkNotNull(targetTable, "target table is unset"); + + // remove all `tmp` columns, which are not in target table + columnExprList.removeIf( + Predicates.and( + ImportColumnDesc::isColumn, + desc -> Objects.isNull(targetTable.getColumn(desc.getColumnName())), + desc -> functionGenTableColNames.contains(desc.getColumnName()) + ) + ); + + // deal with the case like: + // (k1, k2) SET(k1 = `upper(k1)`) + columnExprList = Lists.newArrayList(columnExprList.stream() + .collect(Collectors.toMap( + ImportColumnDesc::getColumnName, + Function.identity(), + (lhs, rhs) -> { + if (lhs.getExpr() != null && rhs.getExpr() == null) { + return lhs; + } + if (lhs.getExpr() == null && rhs.getExpr() != null) { + return rhs; + } + throw new IllegalArgumentException( + String.format("column `%s` specified twice", lhs.getColumnName())); + } + )).values()); + + // deal with the case that column in target table but not in tvf + columnExprList.removeIf(desc -> + !functionGenTableColNames.contains(desc.getColumnName()) + && Objects.nonNull(targetTable.getColumn(desc.getColumnName())) + && desc.isColumn() + ); + + LOG.debug("filtered result:{}", columnExprList); + return columnExprList; + } + + private void resetTargetColumnNames(List<ImportColumnDesc> columnExprList) { + targetColumnNames = columnExprList + .stream() + .map(ImportColumnDesc::getColumnName) + .collect(Collectors.toList()); + LOG.debug("target cols:{}", targetColumnNames); + } + + private void resetSelectList(List<ImportColumnDesc> columnExprList) { + if (isCsvFormat) { + rewriteExprColNameToCsvStyle(columnExprList); + } + LOG.debug("select list:{}", columnExprList); + final SelectList selectList = new SelectList(); + columnExprList.forEach(desc -> { + if (!desc.isColumn()) { + selectList.addItem(new SelectListItem(desc.getExpr(), desc.getColumnName())); + return; + } + + if (isCsvFormat) { + // use csv-style-column name and target column name as alias + final Expr slotRef = new SlotRef(null, selectColNameToCsvColName.get(desc.getColumnName())); + selectList.addItem(new SelectListItem(slotRef, desc.getColumnName())); + } else { + selectList.addItem(new SelectListItem(new SlotRef(null, desc.getColumnName()), null)); + } + }); + ((SelectStmt) getQueryStmt()).resetSelectList(selectList); + } + + private void rewriteExprColNameToCsvStyle(List<ImportColumnDesc> columnExprList) { + Preconditions.checkNotNull(selectColNameToCsvColName, + "SelectColName To CsvColName is not recorded"); + columnExprList + .stream() + .filter(Predicates.not(ImportColumnDesc::isColumn)) + .forEach(desc -> rewriteSlotRefInExpr(desc.getExpr())); + + // rewrite where predicate and order by elements + final SelectStmt selectStmt = (SelectStmt) getQueryStmt(); + rewriteSlotRefInExpr(selectStmt.getWhereClause()); + selectStmt.getOrderByElements().forEach(orderByElement -> rewriteSlotRefInExpr(orderByElement.getExpr())); + } + + private void rewriteSlotRefInExpr(Expr expr) { + final Predicate<? super Expr> rewritePredicate = e -> e instanceof SlotRef + && selectColNameToCsvColName.containsKey(((SlotRef) e).getColumnName()); + final Consumer<? super Expr> rewriteOperation = e -> { + final SlotRef slotRef = (SlotRef) e; + slotRef.setCol(selectColNameToCsvColName.get(slotRef.getColumnName())); + }; + + List<Expr> slotRefs = Lists.newArrayList(); + expr.collect(rewritePredicate, slotRefs); + slotRefs.forEach(rewriteOperation); + } + + // ----------------------------------------------------------------------------------------- +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 238ed47040..9720300ab2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -2653,4 +2653,9 @@ public class SelectStmt extends QueryStmt { return null; } } + + public void resetSelectList(SelectList selectList) { + this.selectList = selectList; + this.originSelectList = selectList.clone(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java index 8a58ee9523..351fa11688 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UnifiedLoadStmt.java @@ -17,6 +17,8 @@ package org.apache.doris.analysis; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.common.DdlException; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; @@ -43,7 +45,7 @@ public class UnifiedLoadStmt extends DdlStmt { } public static UnifiedLoadStmt buildMysqlLoadStmt(DataDescription dataDescription, Map<String, String> properties, - String comment) { + String comment) { final ConnectContext connectContext = ConnectContext.get(); if (connectContext != null && connectContext.getSessionVariable().isEnableUnifiedLoad()) { return new UnifiedLoadStmt(new MysqlLoadStmt(dataDescription, properties, comment)); @@ -52,10 +54,15 @@ public class UnifiedLoadStmt extends DdlStmt { } public static UnifiedLoadStmt buildBrokerLoadStmt(LabelName label, List<DataDescription> dataDescriptions, - BrokerDesc brokerDesc, - Map<String, String> properties, String comment) { + BrokerDesc brokerDesc, + Map<String, String> properties, String comment) throws DdlException { + final ConnectContext connectContext = ConnectContext.get(); if (connectContext != null && connectContext.getSessionVariable().isEnableUnifiedLoad()) { + if (brokerDesc != null && brokerDesc.getStorageType() == StorageType.S3) { + // for tvf solution validation + return new UnifiedLoadStmt(new S3TvfLoadStmt(label, dataDescriptions, brokerDesc, properties, comment)); + } return new UnifiedLoadStmt(new BrokerLoadStmt(label, dataDescriptions, brokerDesc, properties, comment)); } return new UnifiedLoadStmt(new LoadStmt(label, dataDescriptions, brokerDesc, properties, comment)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c86da0daed..9cb5a7fe2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -24,6 +24,7 @@ import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.analysis.AnalyzeTblStmt; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.ArrayLiteral; +import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.CreateTableAsSelectStmt; import org.apache.doris.analysis.CreateTableLikeStmt; import org.apache.doris.analysis.DdlStmt; @@ -903,10 +904,10 @@ public class StmtExecutor { unifiedLoadStmt.init(); final StatementBase proxyStmt = unifiedLoadStmt.getProxyStmt(); parsedStmt = proxyStmt; - if (!(proxyStmt instanceof LoadStmt)) { + if (!(proxyStmt instanceof LoadStmt) && !(proxyStmt instanceof CreateRoutineLoadStmt)) { Preconditions.checkState( - parsedStmt instanceof InsertStmt && ((InsertStmt) parsedStmt).needLoadManager(), - "enable_unified_load=true, should be external insert stmt"); + parsedStmt instanceof InsertStmt, + "enable_unified_load=true, should be insert stmt"); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 994353af37..1bd07d0cff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -85,9 +85,9 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio public static final Logger LOG = LogManager.getLogger(ExternalFileTableValuedFunction.class); protected static final String DEFAULT_COLUMN_SEPARATOR = ","; protected static final String DEFAULT_LINE_DELIMITER = "\n"; - protected static final String FORMAT = "format"; - protected static final String COLUMN_SEPARATOR = "column_separator"; - protected static final String LINE_DELIMITER = "line_delimiter"; + public static final String FORMAT = "format"; + public static final String COLUMN_SEPARATOR = "column_separator"; + public static final String LINE_DELIMITER = "line_delimiter"; protected static final String JSON_ROOT = "json_root"; protected static final String JSON_PATHS = "jsonpaths"; protected static final String STRIP_OUTER_ARRAY = "strip_outer_array"; diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index 6f37fa5a5e..9b820fa185 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -29,6 +29,7 @@ import org.apache.doris.datasource.property.constants.S3Properties; import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.thrift.TFileType; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import java.util.HashMap; @@ -57,6 +58,7 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.<String>builder() .add(S3_URI) + .add(S3Properties.ENDPOINT) .addAll(DEPRECATED_KEYS) .addAll(S3Properties.TVF_REQUIRED_FIELDS) .addAll(OPTIONAL_KEYS) @@ -70,7 +72,9 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { Map<String, String> tvfParams = getValidParams(params); forceVirtualHosted = isVirtualHosted(tvfParams); s3uri = getS3Uri(tvfParams); - String endpoint = getEndpointFromUri(); + final String endpoint = forceVirtualHosted + ? getEndpointAndSetVirtualBucket(params) + : s3uri.getBucketScheme(); CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(endpoint, tvfParams.getOrDefault(S3Properties.REGION, S3Properties.getRegionOfEndpoint(endpoint)), tvfParams.get(S3Properties.ACCESS_KEY), @@ -112,21 +116,20 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { return S3Properties.requiredS3TVFProperties(validParams); } - private String getEndpointFromUri() throws AnalysisException { - if (forceVirtualHosted) { - // s3uri.getVirtualBucket() is: virtualBucket.endpoint, Eg: + private String getEndpointAndSetVirtualBucket(Map<String, String> params) throws AnalysisException { + Preconditions.checkState(forceVirtualHosted, "only invoked when force virtual hosted."); + String[] fileds = s3uri.getVirtualBucket().split("\\.", 2); + virtualBucket = fileds[0]; + if (fileds.length > 1) { + // At this point, s3uri.getVirtualBucket() is: virtualBucket.endpoint, Eg: // uri: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt // s3uri.getVirtualBucket() = my_bucket.cos.ap-beijing.myqcloud.com, // so we need separate virtualBucket and endpoint. - String[] fileds = s3uri.getVirtualBucket().split("\\.", 2); - virtualBucket = fileds[0]; - if (fileds.length > 1) { - return fileds[1]; - } else { - throw new AnalysisException("can not parse endpoint, please check uri."); - } + return fileds[1]; + } else if (params.containsKey(S3Properties.ENDPOINT)) { + return params.get(S3Properties.ENDPOINT); } else { - return s3uri.getBucketScheme(); + throw new AnalysisException("can not parse endpoint, please check uri."); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java new file mode 100644 index 0000000000..dd40ecb637 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/S3TvfLoadStmtTest.java @@ -0,0 +1,245 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.analysis.BinaryPredicate.Operator; +import org.apache.doris.analysis.StorageBackend.StorageType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.datasource.property.constants.S3Properties.Env; +import org.apache.doris.load.loadv2.LoadTask.MergeType; +import org.apache.doris.tablefunction.S3TableValuedFunction; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import mockit.Expectations; +import mockit.Injectable; +import org.apache.hadoop.util.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.StringReader; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class S3TvfLoadStmtTest { + + private static final String ACCESS_KEY_VALUE = "ak"; + + private static final String SECRET_KEY_VALUE = "sk"; + + private static final String ENDPOINT_VALUE = "cos.ap-beijing.myqcloud.com"; + + private static final String REGION_VALUE = "ap-beijing"; + + private static final String DATA_URI = "s3://doris-build-1308700295/regression/load/data/part*"; + + private static final String FORMAT = "parquet"; + + private static final String TARGET_TABLE_NAME = "target"; + + private LabelName labelName; + + private BrokerDesc brokerDesc; + + private Set<String> colNames; + + @Before + public void setUp() throws AnalysisException { + FeConstants.runningUnitTest = true; + + labelName = new LabelName("testDb", "testTbl"); + + final Map<String, String> brokerProperties = Maps.newHashMap(); + brokerProperties.put(Env.ACCESS_KEY, ACCESS_KEY_VALUE); + brokerProperties.put(Env.SECRET_KEY, SECRET_KEY_VALUE); + brokerProperties.put(Env.ENDPOINT, ENDPOINT_VALUE); + brokerProperties.put(Env.REGION, REGION_VALUE); + brokerDesc = new BrokerDesc("s3", StorageType.S3, brokerProperties); + + colNames = Sets.newHashSet("k1", "k2", "k3", "k4"); + } + + @Test + public void testClauses() throws UserException { + final BinaryPredicate greater = new BinaryPredicate(Operator.GT, new IntLiteral(1), new IntLiteral(0)); + final BinaryPredicate less = new BinaryPredicate(Operator.LT, new IntLiteral(1), new IntLiteral(0)); + DataDescription dataDescription = buildDataDesc( + Lists.newArrayList(colNames), + greater, + less, + null + ); + final S3TvfLoadStmt s3TvfLoadStmt = new S3TvfLoadStmt(labelName, Collections.singletonList(dataDescription), + brokerDesc, + Maps.newHashMap(), "comment"); + final SelectStmt selectStmt = (SelectStmt) s3TvfLoadStmt.getQueryStmt(); + final Expr whereClause = Deencapsulation.getField(selectStmt, "whereClause"); + Assert.assertEquals(whereClause, new CompoundPredicate(CompoundPredicate.Operator.AND, greater, less)); + } + + @Test + public void testTvfGeneration() { + DataDescription dataDescription = buildDataDesc( + Lists.newArrayList(colNames), + null, + null, + null + ); + final TableRef tvfRef = Deencapsulation.invoke(S3TvfLoadStmt.class, + "buildTvfRef", + dataDescription, brokerDesc); + Assert.assertTrue(tvfRef instanceof TableValuedFunctionRef); + final S3TableValuedFunction tableFunction + = (S3TableValuedFunction) ((TableValuedFunctionRef) tvfRef).getTableFunction(); + final Map<String, String> locationProperties = tableFunction.getLocationProperties(); + Assert.assertEquals(locationProperties.get(S3Properties.ENDPOINT), ENDPOINT_VALUE); + Assert.assertEquals(locationProperties.get(S3Properties.ACCESS_KEY), ACCESS_KEY_VALUE); + Assert.assertEquals(locationProperties.get(S3Properties.SECRET_KEY), SECRET_KEY_VALUE); + Assert.assertEquals(locationProperties.get(S3Properties.REGION), REGION_VALUE); + Assert.assertEquals(tableFunction.getFilePath(), DATA_URI); + } + + @Injectable + Table targetTable; + + @Injectable + DataDescription dataDescription; + + @Test + public void testColumnMappings() throws Exception { + // c1/c2/c3 in both file and table, and c5 is only in table + final List<ImportColumnDesc> columnsDescList = getColumnsDescList( + "c1,c2,c3,c1=upper(c1), tmp_c4=c1 + 1, c5 = tmp_c4+1"); + // DataDescription dataDescription = buildDataDesc(colNames, null, null, null); + new Expectations() { + { + dataDescription.getParsedColumnExprList(); + minTimes = 0; + result = columnsDescList; + + dataDescription.getFilePaths(); + minTimes = 0; + result = Collections.singletonList(DATA_URI); + + targetTable.getBaseSchema(); + minTimes = 0; + result = getBaseSchema(); + + targetTable.getColumn("c1"); + minTimes = 0; + result = new Column(); + + targetTable.getColumn("c2"); + minTimes = 0; + result = new Column(); + + targetTable.getColumn("c3"); + minTimes = 0; + result = new Column(); + + targetTable.getColumn("c5"); + minTimes = 0; + result = new Column(); + + targetTable.getColumn("tmp_c4"); + minTimes = 0; + result = null; + } + }; + final S3TvfLoadStmt s3TvfLoadStmt = new S3TvfLoadStmt(labelName, Collections.singletonList(dataDescription), + brokerDesc, null, "comment"); + s3TvfLoadStmt.setTargetTable(targetTable); + Deencapsulation.setField(s3TvfLoadStmt, "functionGenTableColNames", Sets.newHashSet("c1", "c2", "c3")); + + Deencapsulation.invoke(s3TvfLoadStmt, "rewriteExpr", columnsDescList); + Assert.assertEquals(columnsDescList.size(), 5); + final String orig4 = "upper(`c1`) + 1 + 1"; + Assert.assertEquals(orig4, columnsDescList.get(4).getExpr().toString()); + + final List<ImportColumnDesc> filterColumns = Deencapsulation.invoke(s3TvfLoadStmt, + "filterColumns", columnsDescList); + Assert.assertEquals(filterColumns.size(), 4); + } + + private static DataDescription buildDataDesc(Iterable<String> columns, Expr fileFilter, Expr wherePredicate, + List<Expr> mappingList) { + + return new DataDescription( + TARGET_TABLE_NAME, + null, + Collections.singletonList(DATA_URI), + Lists.newArrayList(columns), + null, + FORMAT, + null, + false, + mappingList, + fileFilter, + wherePredicate, + MergeType.APPEND, + null, + null, + null + ); + } + + private static List<ImportColumnDesc> getColumnsDescList(String columns) throws Exception { + String columnsSQL = "COLUMNS (" + columns + ")"; + return ((ImportColumnsStmt) SqlParserUtils.getFirstStmt( + new SqlParser(new SqlScanner(new StringReader(columnsSQL))))).getColumns(); + } + + private static List<Column> getBaseSchema() { + List<Column> columns = com.google.common.collect.Lists.newArrayList(); + + Column c1 = new Column("c1", PrimitiveType.BIGINT); + c1.setIsKey(true); + c1.setIsAllowNull(false); + columns.add(c1); + + Column c2 = new Column("c2", ScalarType.createVarchar(25)); + c2.setIsKey(true); + c2.setIsAllowNull(true); + columns.add(c2); + + Column c3 = new Column("c3", PrimitiveType.BIGINT); + c3.setIsKey(true); + c3.setIsAllowNull(false); + columns.add(c3); + + Column c5 = new Column("c5", PrimitiveType.BIGINT); + c5.setIsKey(true); + c5.setIsAllowNull(true); + columns.add(c5); + + return columns; + } + +} diff --git a/regression-test/suites/load_p2/broker_load/test_broker_load.groovy b/regression-test/suites/load_p2/broker_load/test_broker_load.groovy index 21cfd03cdb..d9b4cfea37 100644 --- a/regression-test/suites/load_p2/broker_load/test_broker_load.groovy +++ b/regression-test/suites/load_p2/broker_load/test_broker_load.groovy @@ -350,68 +350,5 @@ suite("test_broker_load_p2", "p2") { } } } - - // test unified load - if (enabled != null && enabled.equalsIgnoreCase("true")) { - sql """ set enable_unified_load=true; """ - def uuids = [] - try { - def i = 0 - for (String table in tables) { - sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text - sql new File("""${context.file.parent}/ddl/${table}_create.sql""").text - - def uuid = UUID.randomUUID().toString().replace("-", "0") - uuids.add(uuid) - do_load_job.call(uuid, paths[i], table, columns_list[i], column_in_paths[i], preceding_filters[i], - set_values[i], where_exprs[i]) - i++ - } - - i = 0 - for (String label in uuids) { - def max_try_milli_secs = 600000 - while (max_try_milli_secs > 0) { - String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """ - if (result[0][2].equals("FINISHED")) { - logger.info("Load FINISHED " + label) - assertTrue(etl_info[i] == result[0][5], "expected: " + etl_info[i] + ", actual: " + result[0][5] + ", label: $label") - break; - } - if (result[0][2].equals("CANCELLED")) { - assertTrue(result[0][7].contains(error_msg[i])) - break; - } - Thread.sleep(1000) - max_try_milli_secs -= 1000 - if(max_try_milli_secs <= 0) { - assertTrue(1 == 2, "load Timeout: $label") - } - } - i++ - } - - def orc_expect_result = """[[20, 15901, 6025915247311731176, 1373910657, 8863282788606566657], [38, 15901, -9154375582268094750, 1373853561, 4923892366467329038], [38, 15901, -9154375582268094750, 1373853561, 8447995939656287502], [38, 15901, -9154375582268094750, 1373853565, 7451966001310881759], [38, 15901, -9154375582268094750, 1373853565, 7746521994248163870], [38, 15901, -9154375582268094750, 1373853577, 6795654975682437824], [38, 15901, -9154375582268094750, 1373853577, [...] - for (String table in tables) { - if (table.matches("orc_s3_case[23456789]")) { - String[][] orc_actual_result = sql """select CounterID, EventDate, UserID, EventTime, WatchID from $table order by CounterID, EventDate, UserID, EventTime, WatchID limit 10;""" - assertTrue("$orc_actual_result" == "$orc_expect_result") - } - } - - order_qt_parquet_s3_case1 """select count(*) from parquet_s3_case1 where col1=10""" - order_qt_parquet_s3_case3 """select count(*) from parquet_s3_case3 where p_partkey < 100000""" - order_qt_parquet_s3_case6 """select count(*) from parquet_s3_case6 where p_partkey < 100000""" - order_qt_parquet_s3_case7 """select count(*) from parquet_s3_case7 where col4=4""" - order_qt_parquet_s3_case8 """ select count(*) from parquet_s3_case8 where p_partkey=1""" - order_qt_parquet_s3_case9 """ select * from parquet_s3_case9""" - - } finally { - for (String table in tables) { - sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text - } - sql """ set enable_unified_load=false; """ - } - } } diff --git a/regression-test/suites/load_p2/broker_load/test_tvf_based_broker_load.groovy b/regression-test/suites/load_p2/broker_load/test_tvf_based_broker_load.groovy new file mode 100644 index 0000000000..964e2c926a --- /dev/null +++ b/regression-test/suites/load_p2/broker_load/test_tvf_based_broker_load.groovy @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_tvf_based_broker_load_p2", "p2") { + + def tables = ["part", + "upper_case", + "reverse", + "set1", + "set2", + "set3", + "set4", + "set5", + "set6", + "set7", + "null_default", + "filter", + // "path_column", + "parquet_s3_case1", // col1 not in file but in table, will load default value for it. + "parquet_s3_case2", // x1 not in file, not in table, will throw "col not found" error. + "parquet_s3_case3", // p_comment not in table but in file, load normally. + "parquet_s3_case4", // all columns are in table but not in file, will fill default values. + "parquet_s3_case5", // x1 not in file, not in table, will throw "col not found" error. + "parquet_s3_case6", // normal + "parquet_s3_case7", // col5 will be ignored, load normally + "parquet_s3_case8", // first column in table is not specified, will load default value for it. + "parquet_s3_case9", // first column in table is not specified, will load default value for it. + "orc_s3_case1", // table column capitalize firsrt + "orc_s3_case2", // table column lowercase * load column lowercase * orc file lowercase + "orc_s3_case3", // table column lowercase * load column uppercase * orc file lowercase + "orc_s3_case4", // table column lowercase * load column lowercase * orc file uppercase + "orc_s3_case5", // table column lowercase * load column uppercase * orc file uppercase + "orc_s3_case6", // table column uppercase * load column uppercase * orc file lowercase + "orc_s3_case7", // table column uppercase * load column lowercase * orc file lowercase + "orc_s3_case8", // table column uppercase * load column uppercase * orc file uppercase + "orc_s3_case9", // table column uppercase * load column lowercase * orc file uppercase + ] + def paths = ["s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + // "s3://doris-build-1308700295/regression/load/data/path/*/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/part*", + "s3://doris-build-1308700295/regression/load/data/random_all_types/part*", + "s3://doris-build-1308700295/regression/load/data/orc/hits_100k_rows.orc", + "s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc", + "s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc", + "s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_uppercase.orc", + "s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_uppercase.orc", + "s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc", + "s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_lowercase.orc", + "s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_uppercase.orc", + "s3://doris-build-1308700295/regression/load/data/orc/hits_10k_rows_uppercase.orc", + ] + def columns_list = ["""p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment""", + """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment""", + """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment""", + """p_partkey, p_name, p_size""", + """p_partkey""", + """p_partkey""", + """p_partkey, p_size""", + """p_partkey""", + """p_partkey, p_size""", + """p_partkey, p_size""", + """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment""", + """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment""", + // """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment""", + """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, col1""", + """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment, x1""", + """p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice, p_comment""", + """col1, col2, col3, col4""", + """p_partkey, p_name, p_mfgr, x1""", + """p_partkey, p_name, p_mfgr, p_brand""", + """p_partkey, p_name, p_mfgr, p_brand""", + """p_name, p_mfgr""", + """""", + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + //TODO: comment blow 8 rows after jibing fix + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase,adveng [...] + //TODO: uncomment blow 8 rows after jibing fix + // """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase, [...] + // """WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE, [...] + // """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase, [...] + // """WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE, [...] + // """WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE, [...] + // """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase, [...] + // """WATCHID,JAVAENABLE,TITLE,GOODEVENT,EVENTTIME,EVENTDATE,COUNTERID,CLIENTIP,REGIONID,USERID,COUNTERCLASS,OS,USERAGENT,URL,REFERER,ISREFRESH,REFERERCATEGORYID,REFERERREGIONID,URLCATEGORYID,URLREGIONID,RESOLUTIONWIDTH,RESOLUTIONHEIGHT,RESOLUTIONDEPTH,FLASHMAJOR,FLASHMINOR,FLASHMINOR2,NETMAJOR,NETMINOR,USERAGENTMAJOR,USERAGENTMINOR,COOKIEENABLE,JAVASCRIPTENABLE,ISMOBILE,MOBILEPHONE,MOBILEPHONEMODEL,PARAMS,IPNETWORKID,TRAFICSOURCEID,SEARCHENGINEID,SEARCHPHRASE, [...] + // """watchid,javaenable,title,goodevent,eventtime,eventdate,counterid,clientip,regionid,userid,counterclass,os,useragent,url,referer,isrefresh,referercategoryid,refererregionid,urlcategoryid,urlregionid,resolutionwidth,resolutionheight,resolutiondepth,flashmajor,flashminor,flashminor2,netmajor,netminor,useragentmajor,useragentminor,cookieenable,javascriptenable,ismobile,mobilephone,mobilephonemodel,params,ipnetworkid,traficsourceid,searchengineid,searchphrase, [...] + ] + def column_in_paths = ["", "", "", "", "", "", "", "", "", "", "", ""/*, "COLUMNS FROM PATH AS (city)"*/, "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""] + def preceding_filters = ["", "", "", "", "", "", "", "", "", "", "", "preceding filter p_size < 10"/*, ""*/, "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""] + def set_values = ["", + "", + "SET(comment=p_comment, retailprice=p_retailprice, container=p_container, size=p_size, type=p_type, brand=p_brand, mfgr=p_mfgr, name=p_name, partkey=p_partkey)", + "set(p_name=upper(p_name),p_greatest=greatest(cast(p_partkey as int), cast(p_size as int)))", + "set(p_partkey = p_partkey + 100)", + "set(partkey = p_partkey + 100)", + "set(partkey = p_partkey + p_size)", + "set(tmpk = p_partkey + 1, partkey = tmpk*2)", + "set(partkey = p_partkey + 1, partsize = p_size*2)", + "set(partsize = p_partkey + p_size)", + "", + "", + // "", + "", + "", + "", + "", + "set(col4 = x1)", + "set(col4 = p_brand)", + "set(col5 = p_brand)", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "" + ] + def where_exprs = ["", "", "", "", "", "", "", "", "", "", "", "where p_partkey>10"/*, ""*/, "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ""] + + def etl_info = ["unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=163706; dpp.abnorm.ALL=0; dpp.norm.ALL=36294", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "\\N", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "\\N", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=200000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=4096", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=100000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", + "unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=10000", + ] + + def task_info = ["cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + "cluster:cos.ap-beijing.myqcloud.com; timeout(s):14400; max_filter_ratio:0.0", + ] + + def error_msg = ["", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "[INTERNAL_ERROR]failed to find default value expr for slot: x1", + "", + "", + "[INTERNAL_ERROR]failed to find default value expr for slot: x1", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + "", + ] + + String ak = getS3AK() + String sk = getS3SK() + String enabled = context.config.otherConfigs.get("enableBrokerLoad") + + def do_load_job = { uuid, path, table, columns, column_in_path, preceding_filter, + set_value, where_expr -> + String columns_str = ("$columns" != "") ? "($columns)" : ""; + String format_str = table.startsWith("orc_s3_case") ? "ORC" : "PARQUET" + sql """ + LOAD LABEL $uuid ( + DATA INFILE("$path") + INTO TABLE $table + FORMAT AS $format_str + $columns_str + $column_in_path + $preceding_filter + $set_value + $where_expr + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com", + "AWS_REGION" = "ap-beijing" + ) + """ + logger.info("Submit load with lable: $uuid, table: $table, path: $path") + } + + // test unified load + if (enabled != null && enabled.equalsIgnoreCase("true")) { + sql """ set enable_unified_load=true; """ + def uuids = [] + try { + def i = 0 + for (String table in tables) { + sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text + sql new File("""${context.file.parent}/ddl/${table}_create.sql""").text + + def uuid = UUID.randomUUID().toString().replace("-", "0") + uuids.add(uuid) + do_load_job.call(uuid, paths[i], table, columns_list[i], column_in_paths[i], preceding_filters[i], + set_values[i], where_exprs[i]) + i++ + } + + def orc_expect_result = """[[20, 15901, 6025915247311731176, 1373910657, 8863282788606566657], [38, 15901, -9154375582268094750, 1373853561, 4923892366467329038], [38, 15901, -9154375582268094750, 1373853561, 8447995939656287502], [38, 15901, -9154375582268094750, 1373853565, 7451966001310881759], [38, 15901, -9154375582268094750, 1373853565, 7746521994248163870], [38, 15901, -9154375582268094750, 1373853577, 6795654975682437824], [38, 15901, -9154375582268094750, 1373853577, [...] + for (String table in tables) { + if (table.matches("orc_s3_case[23456789]")) { + String[][] orc_actual_result = sql """select CounterID, EventDate, UserID, EventTime, WatchID from $table order by CounterID, EventDate, UserID, EventTime, WatchID limit 10;""" + assertTrue("$orc_actual_result" == "$orc_expect_result") + } + } + + order_qt_parquet_s3_case1 """select count(*) from parquet_s3_case1 where col1=10""" + order_qt_parquet_s3_case3 """select count(*) from parquet_s3_case3 where p_partkey < 100000""" + order_qt_parquet_s3_case6 """select count(*) from parquet_s3_case6 where p_partkey < 100000""" + order_qt_parquet_s3_case7 """select count(*) from parquet_s3_case7 where col4=4""" + order_qt_parquet_s3_case8 """ select count(*) from parquet_s3_case8 where p_partkey=1""" + order_qt_parquet_s3_case9 """ select * from parquet_s3_case9""" + + } finally { + for (String table in tables) { + sql new File("""${context.file.parent}/ddl/${table}_drop.sql""").text + } + sql """ set enable_unified_load=false; """ + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
