This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c1d73ecefb7 [chore](load) rm some load related redundant code (#27102)
c1d73ecefb7 is described below
commit c1d73ecefb724df21e01da2efbc32b63c1de762d
Author: Siyang Tang <[email protected]>
AuthorDate: Fri Dec 1 09:29:28 2023 +0800
[chore](load) rm some load related redundant code (#27102)
---
.../org/apache/doris/analysis/DataDescription.java | 60 -----
.../org/apache/doris/load/BrokerFileGroup.java | 76 ------
.../src/main/java/org/apache/doris/load/Load.java | 289 +--------------------
.../main/java/org/apache/doris/load/LoadJob.java | 1 +
.../java/org/apache/doris/load/loadv2/LoadJob.java | 12 -
.../doris/load/loadv2/BrokerLoadJobTest.java | 81 +-----
6 files changed, 17 insertions(+), 502 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 9e7e2f656dd..22e9dca39c9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -46,7 +46,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -636,10 +635,6 @@ public class DataDescription implements
InsertStmt.DataDesc {
return lineDelimiter;
}
- public void setLineDelimiter(Separator lineDelimiter) {
- this.lineDelimiter = lineDelimiter;
- }
-
public byte getEnclose() {
return enclose;
}
@@ -716,14 +711,6 @@ public class DataDescription implements
InsertStmt.DataDesc {
this.jsonRoot = jsonRoot;
}
- @Deprecated
- public void addColumnMapping(String functionName, Pair<String,
List<String>> pair) {
- if (Strings.isNullOrEmpty(functionName) || pair == null) {
- return;
- }
- columnToHadoopFunction.put(functionName, pair);
- }
-
public Map<String, Pair<String, List<String>>> getColumnToHadoopFunction()
{
return columnToHadoopFunction;
}
@@ -1143,53 +1130,6 @@ public class DataDescription implements
InsertStmt.DataDesc {
}
}
- /*
- * If user does not specify COLUMNS in load stmt, we fill it here.
- * eg1:
- * both COLUMNS and SET clause is empty. after fill:
- * (k1,k2,k3)
- *
- * eg2:
- * COLUMNS is empty, SET is not empty
- * SET ( k2 = default_value("2") )
- * after fill:
- * (k1, k2, k3)
- * SET ( k2 = default_value("2") )
- *
- * eg3:
- * COLUMNS is empty, SET is not empty
- * SET (k2 = strftime("%Y-%m-%d %H:%M:%S", k2)
- * after fill:
- * (k1,k2,k3)
- * SET (k2 = strftime("%Y-%m-%d %H:%M:%S", k2)
- *
- */
- public void fillColumnInfoIfNotSpecified(List<Column> baseSchema) {
- if (fileFieldNames != null && !fileFieldNames.isEmpty()) {
- return;
- }
-
- fileFieldNames = Lists.newArrayList();
-
- Set<String> mappingColNames =
Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
- for (ImportColumnDesc importColumnDesc : parsedColumnExprList) {
- mappingColNames.add(importColumnDesc.getColumnName());
- }
-
- for (Column column : baseSchema) {
- if (!mappingColNames.contains(column.getName())) {
- parsedColumnExprList.add(new
ImportColumnDesc(column.getName(), null));
- }
- if ("json".equals(this.fileFormat)) {
- fileFieldNames.add(column.getName());
- } else {
- fileFieldNames.add(column.getName().toLowerCase());
- }
- }
-
- LOG.debug("after fill column info. columns: {}, parsed column exprs:
{}", fileFieldNames, parsedColumnExprList);
- }
-
public String toSql() {
StringBuilder sb = new StringBuilder();
if (isMysqlLoad) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index c193ab8f773..332c82e3617 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -21,9 +21,7 @@ import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.PartitionNames;
-import org.apache.doris.analysis.Separator;
import org.apache.doris.catalog.AggregateType;
-import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.HiveTable;
@@ -32,14 +30,12 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
-import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.thrift.TFileCompressType;
-import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -95,9 +91,6 @@ public class BrokerFileGroup implements Writable {
private long srcTableId = -1;
private boolean isLoadFromTable = false;
- // for multi load
- private TNetworkAddress beAddr;
- private long backendID;
private boolean stripOuterArray = false;
private String jsonPaths = "";
private String jsonRoot = "";
@@ -115,41 +108,6 @@ public class BrokerFileGroup implements Writable {
private BrokerFileGroup() {
}
- // Used for broker table, no need to parse
- public BrokerFileGroup(BrokerTable table) throws AnalysisException {
- this.tableId = table.getId();
- this.columnSeparator =
Separator.convertSeparator(table.getColumnSeparator());
- this.lineDelimiter =
Separator.convertSeparator(table.getLineDelimiter());
- this.isNegative = false;
- this.filePaths = table.getPaths();
- this.fileFormat = table.getFileFormat();
- }
-
- /**
- * Should used for hive/iceberg/hudi external table.
- */
- public BrokerFileGroup(long tableId,
- String filePath,
- String fileFormat) throws AnalysisException {
- this(tableId, "|", "\n", filePath, fileFormat, null, null);
- }
-
- /**
- * Should used for hive/iceberg/hudi external table.
- */
- public BrokerFileGroup(long tableId, String columnSeparator, String
lineDelimiter, String filePath,
- String fileFormat, List<String> columnNamesFromPath,
List<ImportColumnDesc> columnExprList)
- throws AnalysisException {
- this.tableId = tableId;
- this.columnSeparator = Separator.convertSeparator(columnSeparator);
- this.lineDelimiter = Separator.convertSeparator(lineDelimiter);
- this.isNegative = false;
- this.filePaths = Lists.newArrayList(filePath);
- this.fileFormat = fileFormat;
- this.columnNamesFromPath = columnNamesFromPath;
- this.columnExprList = columnExprList;
- }
-
public BrokerFileGroup(DataDescription dataDescription) {
this.fileFieldNames = dataDescription.getFileFieldNames();
this.columnNamesFromPath = dataDescription.getColumnsFromPath();
@@ -251,8 +209,6 @@ public class BrokerFileGroup implements Writable {
srcTableId = srcTable.getId();
isLoadFromTable = true;
}
- beAddr = dataDescription.getBeAddr();
- backendID = dataDescription.getBackendId();
if (fileFormat != null && fileFormat.equalsIgnoreCase("json")) {
stripOuterArray = dataDescription.isStripOuterArray();
jsonPaths = dataDescription.getJsonPaths();
@@ -363,62 +319,30 @@ public class BrokerFileGroup implements Writable {
this.fileSize = fileSize;
}
- public TNetworkAddress getBeAddr() {
- return beAddr;
- }
-
- public long getBackendID() {
- return backendID;
- }
-
public boolean isStripOuterArray() {
return stripOuterArray;
}
- public void setStripOuterArray(boolean stripOuterArray) {
- this.stripOuterArray = stripOuterArray;
- }
-
public boolean isFuzzyParse() {
return fuzzyParse;
}
- public void setFuzzyParse(boolean fuzzyParse) {
- this.fuzzyParse = fuzzyParse;
- }
-
public boolean isReadJsonByLine() {
return readJsonByLine;
}
- public void setReadJsonByLine(boolean readJsonByLine) {
- this.readJsonByLine = readJsonByLine;
- }
-
public boolean isNumAsString() {
return numAsString;
}
- public void setNumAsString(boolean numAsString) {
- this.numAsString = numAsString;
- }
-
public String getJsonPaths() {
return jsonPaths;
}
- public void setJsonPaths(String jsonPaths) {
- this.jsonPaths = jsonPaths;
- }
-
public String getJsonRoot() {
return jsonRoot;
}
- public void setJsonRoot(String jsonRoot) {
- this.jsonRoot = jsonRoot;
- }
-
public boolean isBinaryFileFormat() {
if (fileFormat == null) {
// null means default: csv
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index f4bc0f089b2..a3e25cde6b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -30,21 +30,16 @@ import org.apache.doris.analysis.FunctionParams;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.NullLiteral;
-import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
-import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.ScalarType;
@@ -59,8 +54,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.ErrorCode;
-import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
@@ -80,7 +73,6 @@ import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TFileFormatType;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -91,7 +83,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -207,282 +198,6 @@ public class Load {
lock.writeLock().unlock();
}
- /*
- * This is only used for hadoop load
- */
- public static void checkAndCreateSource(Database db, DataDescription
dataDescription,
- Map<Long, Map<Long, List<Source>>> tableToPartitionSources,
EtlJobType jobType) throws DdlException {
- Source source = new Source(dataDescription.getFilePaths());
- long tableId = -1;
- Set<Long> sourcePartitionIds = Sets.newHashSet();
-
- // source column names and partitions
- String tableName = dataDescription.getTableName();
- Map<String, Pair<String, List<String>>> columnToFunction = null;
-
- OlapTable table = db.getOlapTableOrDdlException(tableName);
- tableId = table.getId();
-
- table.readLock();
- try {
- if (table.getPartitionInfo().isMultiColumnPartition() && jobType
== EtlJobType.HADOOP) {
- throw new DdlException("Load by hadoop cluster does not
support table with multi partition columns."
- + " Table: " + table.getName() + ". Try using broker
load. See 'help broker load;'");
- }
-
- // check partition
- if (dataDescription.getPartitionNames() != null
- && table.getPartitionInfo().getType() ==
PartitionType.UNPARTITIONED) {
-
ErrorReport.reportDdlException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED);
- }
-
- if (table.getState() == OlapTableState.RESTORE) {
- throw new DdlException("Table [" + tableName + "] is under
restore");
- }
-
- if (table.getKeysType() != KeysType.AGG_KEYS &&
dataDescription.isNegative()) {
- throw new DdlException("Load for AGG_KEYS table should not
specify NEGATIVE");
- }
-
- // get table schema
- List<Column> baseSchema = table.getBaseSchema(false);
- // fill the column info if user does not specify them
- dataDescription.fillColumnInfoIfNotSpecified(baseSchema);
-
- // source columns
- List<String> columnNames = Lists.newArrayList();
- List<String> assignColumnNames = Lists.newArrayList();
- if (dataDescription.getFileFieldNames() != null) {
- assignColumnNames.addAll(dataDescription.getFileFieldNames());
- if (dataDescription.getColumnsFromPath() != null) {
-
assignColumnNames.addAll(dataDescription.getColumnsFromPath());
- }
- }
- if (assignColumnNames.isEmpty()) {
- // use table columns
- for (Column column : baseSchema) {
- columnNames.add(column.getName());
- }
- } else {
- // convert column to schema format
- for (String assignCol : assignColumnNames) {
- if (table.getColumn(assignCol) != null) {
- columnNames.add(table.getColumn(assignCol).getName());
- } else {
- columnNames.add(assignCol);
- }
- }
- }
- source.setColumnNames(columnNames);
-
- // check default value
- Map<String, Pair<String, List<String>>> columnToHadoopFunction
- = dataDescription.getColumnToHadoopFunction();
- List<ImportColumnDesc> parsedColumnExprList =
dataDescription.getParsedColumnExprList();
- Map<String, Expr> parsedColumnExprMap =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
- for (ImportColumnDesc importColumnDesc : parsedColumnExprList) {
- parsedColumnExprMap.put(importColumnDesc.getColumnName(),
importColumnDesc.getExpr());
- }
- for (Column column : baseSchema) {
- String columnName = column.getName();
- if (columnNames.contains(columnName)) {
- continue;
- }
-
- if (parsedColumnExprMap.containsKey(columnName)) {
- continue;
- }
-
- if (column.getDefaultValue() != null || column.isAllowNull()) {
- continue;
- }
-
- throw new DdlException("Column has no default value. column: "
+ columnName);
- }
-
- // check negative for sum aggregate type
- if (dataDescription.isNegative()) {
- for (Column column : baseSchema) {
- if (!column.isKey() && column.getAggregationType() !=
AggregateType.SUM) {
- throw new DdlException("Column is not SUM
AggregateType. column:" + column.getName());
- }
- }
- }
-
- // check hll
- for (Column column : baseSchema) {
- if (column.getDataType() == PrimitiveType.HLL) {
- if (columnToHadoopFunction != null &&
!columnToHadoopFunction.containsKey(column.getName())) {
- throw new DdlException("Hll column is not assigned.
column:" + column.getName());
- }
- }
- }
-
- // check mapping column exist in table
- // check function
- // convert mapping column and func arg columns to schema format
-
- // When doing schema change, there may have some 'shadow' columns,
with prefix '__doris_shadow_' in
- // their names. These columns are invisible to user, but we need
to generate data for these columns.
- // So we add column mappings for these column.
- // eg1:
- // base schema is (A, B, C), and B is under schema change,
- // so there will be a shadow column: '__doris_shadow_B'
- // So the final column mapping should looks like: (A, B, C,
__doris_shadow_B = substitute(B));
- for (Column column : table.getFullSchema()) {
- if
(column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
- String originCol =
column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PREFIX);
- if (parsedColumnExprMap.containsKey(originCol)) {
- Expr mappingExpr = parsedColumnExprMap.get(originCol);
- if (mappingExpr != null) {
- /*
- * eg:
- * (A, C) SET (B = func(xx))
- * ->
- * (A, C) SET (B = func(xx), __doris_shadow_B =
func(xxx))
- */
- if (columnToHadoopFunction.containsKey(originCol))
{
- columnToHadoopFunction.put(column.getName(),
columnToHadoopFunction.get(originCol));
- }
- ImportColumnDesc importColumnDesc = new
ImportColumnDesc(column.getName(), mappingExpr);
- parsedColumnExprList.add(importColumnDesc);
- } else {
- /*
- * eg:
- * (A, B, C)
- * ->
- * (A, B, C) SET (__doris_shadow_B = substitute(B))
- */
- columnToHadoopFunction.put(column.getName(),
- Pair.of("substitute",
Lists.newArrayList(originCol)));
- ImportColumnDesc importColumnDesc
- = new ImportColumnDesc(column.getName(),
new SlotRef(null, originCol));
- parsedColumnExprList.add(importColumnDesc);
- }
- } else {
- /*
- * There is a case that if user does not specify the
related origin column, eg:
- * COLUMNS (A, C), and B is not specified, but B is
being modified
- * so there is a shadow column '__doris_shadow_B'.
- * We can not just add a mapping function
"__doris_shadow_B = substitute(B)",
- * because Doris can not find column B.
- * In this case, __doris_shadow_B can use its default
value,
- * so no need to add it to column mapping
- */
- // do nothing
- }
-
- } else if (!column.isVisible()) {
- /*
- * For batch delete table add hidden column
__DORIS_DELETE_SIGN__ to columns
- * eg:
- * (A, B, C)
- * ->
- * (A, B, C) SET (__DORIS_DELETE_SIGN__ = 0)
- */
- columnToHadoopFunction.put(column.getName(),
Pair.of("default_value",
- Lists.newArrayList(column.getDefaultValue())));
- ImportColumnDesc importColumnDesc = null;
- try {
- importColumnDesc = new
ImportColumnDesc(column.getName(),
- new FunctionCallExpr("default_value",
Arrays.asList(column.getDefaultValueExpr())));
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
- parsedColumnExprList.add(importColumnDesc);
- }
- }
-
- LOG.debug("after add shadow column. parsedColumnExprList: {},
columnToHadoopFunction: {}",
- parsedColumnExprList, columnToHadoopFunction);
-
- Map<String, String> columnNameMap =
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
- for (String columnName : columnNames) {
- columnNameMap.put(columnName, columnName);
- }
-
- // validate hadoop functions
- if (columnToHadoopFunction != null) {
- columnToFunction = Maps.newHashMap();
- for (Entry<String, Pair<String, List<String>>> entry :
columnToHadoopFunction.entrySet()) {
- String mappingColumnName = entry.getKey();
- Column mappingColumn = table.getColumn(mappingColumnName);
- if (mappingColumn == null) {
- throw new DdlException("Mapping column is not in
table. column: " + mappingColumnName);
- }
-
- Pair<String, List<String>> function = entry.getValue();
- try {
-
DataDescription.validateMappingFunction(function.first, function.second,
columnNameMap,
- mappingColumn, dataDescription.isHadoopLoad());
- } catch (AnalysisException e) {
- throw new DdlException(e.getMessage());
- }
-
- columnToFunction.put(mappingColumn.getName(), function);
- }
- }
-
- // partitions of this source
- OlapTable olapTable = table;
- PartitionNames partitionNames =
dataDescription.getPartitionNames();
- if (partitionNames == null) {
- for (Partition partition : olapTable.getPartitions()) {
- sourcePartitionIds.add(partition.getId());
- }
- } else {
- for (String partitionName :
partitionNames.getPartitionNames()) {
- Partition partition =
olapTable.getPartition(partitionName, partitionNames.isTemp());
- if (partition == null) {
- throw new DdlException("Partition [" + partitionName +
"] does not exist");
- }
- sourcePartitionIds.add(partition.getId());
- }
- }
- } finally {
- table.readUnlock();
- }
-
- // column separator
- String columnSeparator = dataDescription.getColumnSeparator();
- if (!Strings.isNullOrEmpty(columnSeparator)) {
- source.setColumnSeparator(columnSeparator);
- }
-
- // line delimiter
- String lineDelimiter = dataDescription.getLineDelimiter();
- if (!Strings.isNullOrEmpty(lineDelimiter)) {
- source.setLineDelimiter(lineDelimiter);
- }
-
- // source negative
- source.setNegative(dataDescription.isNegative());
-
- // column mapping functions
- if (columnToFunction != null) {
- source.setColumnToFunction(columnToFunction);
- }
-
- // add source to table partition map
- Map<Long, List<Source>> partitionToSources = null;
- if (tableToPartitionSources.containsKey(tableId)) {
- partitionToSources = tableToPartitionSources.get(tableId);
- } else {
- partitionToSources = Maps.newHashMap();
- tableToPartitionSources.put(tableId, partitionToSources);
- }
- for (long partitionId : sourcePartitionIds) {
- List<Source> sources = null;
- if (partitionToSources.containsKey(partitionId)) {
- sources = partitionToSources.get(partitionId);
- } else {
- sources = new ArrayList<Source>();
- partitionToSources.put(partitionId, sources);
- }
- sources.add(source);
- }
- }
-
/**
* When doing schema change, there may have some 'shadow' columns, with
prefix '__doris_shadow_' in
* their names. These columns are invisible to user, but we need to
generate data for these columns.
@@ -762,10 +477,10 @@ public class Load {
if (entry.getKey() != null) {
if
(entry.getKey().equalsIgnoreCase(Column.DELETE_SIGN)) {
throw new UserException("unknown reference column
in DELETE ON clause:"
- + slot.getColumnName());
+ + slot.getColumnName());
} else if
(entry.getKey().equalsIgnoreCase(Column.SEQUENCE_COL)) {
throw new UserException("unknown reference column
in ORDER BY clause:"
- + slot.getColumnName());
+ + slot.getColumnName());
}
}
throw new UserException("unknown reference column,
column=" + entry.getKey()
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
index cf5220edf3b..9a042ca44b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadJob.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+@Deprecated
public class LoadJob implements Writable {
private static final Logger LOG = LogManager.getLogger(LoadJob.class);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 946d116b516..6e5d56aa55e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -334,10 +334,6 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
this.loadStatistic.totalFileSizeB = fileSize;
}
- public TUniqueId getRequestId() {
- return requestId;
- }
-
/**
* Show table names for frontend
* If table name could not be found by id, the table id will be used
instead.
@@ -398,14 +394,6 @@ public abstract class LoadJob extends
AbstractTxnStateChangeCallback implements
return userInfo;
}
- public void setUserInfo(UserIdentity userInfo) {
- this.userInfo = userInfo;
- }
-
- public String getComment() {
- return comment;
- }
-
public void setComment(String comment) {
this.comment = comment;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index 14cd4772db5..42e97db794a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -17,23 +17,17 @@
package org.apache.doris.load.loadv2;
-import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
-import org.apache.doris.analysis.UserIdentity;
-import org.apache.doris.catalog.BrokerTable;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.jmockit.Deencapsulation;
-import org.apache.doris.common.profile.Profile;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
@@ -43,10 +37,7 @@ import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.Load;
import org.apache.doris.load.Source;
import org.apache.doris.metric.MetricRepo;
-import org.apache.doris.planner.OlapTableSink;
-import org.apache.doris.planner.PlanFragment;
import org.apache.doris.task.MasterTaskExecutor;
-import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import com.google.common.collect.Lists;
@@ -61,13 +52,11 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.UUID;
public class BrokerLoadJobTest {
@@ -78,8 +67,8 @@ public class BrokerLoadJobTest {
@Test
public void testFromLoadStmt(@Injectable LoadStmt loadStmt, @Injectable
LabelName labelName,
- @Injectable DataDescription dataDescription, @Mocked Env env,
@Mocked InternalCatalog catalog,
- @Injectable Database database) {
+ @Injectable DataDescription dataDescription,
@Mocked Env env, @Mocked InternalCatalog catalog,
+ @Injectable Database database) {
List<DataDescription> dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(dataDescription);
@@ -122,8 +111,8 @@ public class BrokerLoadJobTest {
@Test
public void testFromLoadStmt2(@Injectable LoadStmt loadStmt, @Injectable
DataDescription dataDescription,
- @Injectable LabelName labelName, @Injectable Database database,
@Injectable OlapTable olapTable,
- @Mocked Env env, @Mocked InternalCatalog catalog) {
+ @Injectable LabelName labelName, @Injectable
Database database, @Injectable OlapTable olapTable,
+ @Mocked Env env, @Mocked InternalCatalog
catalog) {
String label = "label";
long dbId = 1;
@@ -182,7 +171,7 @@ public class BrokerLoadJobTest {
new MockUp<Load>() {
@Mock
public void checkAndCreateSource(Database db, DataDescription
dataDescription,
- Map<Long, Map<Long, List<Source>>>
tableToPartitionSources, EtlJobType jobType) {
+ Map<Long, Map<Long,
List<Source>>> tableToPartitionSources, EtlJobType jobType) {
}
};
@@ -202,8 +191,8 @@ public class BrokerLoadJobTest {
@Test
public void testGetTableNames(@Injectable BrokerFileGroupAggInfo
fileGroupAggInfo,
- @Injectable BrokerFileGroup brokerFileGroup, @Mocked Env env,
@Mocked InternalCatalog catalog,
- @Injectable Database database, @Injectable Table table) throws
MetaNotFoundException {
+ @Injectable BrokerFileGroup brokerFileGroup,
@Mocked Env env, @Mocked InternalCatalog catalog,
+ @Injectable Database database, @Injectable
Table table) throws MetaNotFoundException {
List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
brokerFileGroups.add(brokerFileGroup);
Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups =
Maps.newHashMap();
@@ -281,11 +270,11 @@ public class BrokerLoadJobTest {
@Test
public void testPendingTaskOnFinished(@Injectable
BrokerPendingTaskAttachment attachment, @Mocked Env env,
- @Mocked InternalCatalog catalog, @Injectable Database database,
- @Injectable BrokerFileGroupAggInfo fileGroupAggInfo, @Injectable
BrokerFileGroup brokerFileGroup1,
- @Injectable BrokerFileGroup brokerFileGroup2, @Injectable
BrokerFileGroup brokerFileGroup3,
- @Mocked MasterTaskExecutor masterTaskExecutor, @Injectable
OlapTable olapTable,
- @Mocked LoadingTaskPlanner loadingTaskPlanner) {
+ @Mocked InternalCatalog catalog,
@Injectable Database database,
+ @Injectable BrokerFileGroupAggInfo
fileGroupAggInfo, @Injectable BrokerFileGroup brokerFileGroup1,
+ @Injectable BrokerFileGroup
brokerFileGroup2, @Injectable BrokerFileGroup brokerFileGroup3,
+ @Mocked MasterTaskExecutor
masterTaskExecutor, @Injectable OlapTable olapTable,
+ @Mocked LoadingTaskPlanner
loadingTaskPlanner) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
long taskId = 1L;
@@ -340,48 +329,6 @@ public class BrokerLoadJobTest {
Assert.assertEquals(3, idToTasks.size());
}
- @Test
- public void testPendingTaskOnFinishedWithUserInfo(@Mocked
BrokerPendingTaskAttachment attachment,
- @Mocked Env env,
- @Injectable BrokerDesc brokerDesc,
- @Injectable LoadTaskCallback
callback,
- @Injectable Database database,
- @Injectable FileGroupAggKey aggKey,
- @Mocked OlapTable olapTable,
- @Mocked PlanFragment sinkFragment,
- @Mocked OlapTableSink olapTableSink)
throws Exception {
- List<Column> schema = new ArrayList<>();
- schema.add(new Column("a", PrimitiveType.BIGINT));
- Map<String, String> properties = new HashMap<>();
- properties.put("broker_name", "test");
- properties.put("path", "hdfs://www.test.com");
- BrokerTable brokerTable = new BrokerTable(123L, "test", schema,
properties);
- BrokerFileGroup brokerFileGroup = new BrokerFileGroup(brokerTable);
- List<Long> partitionIds = new ArrayList<>();
- partitionIds.add(123L);
- Deencapsulation.setField(brokerFileGroup, "partitionIds",
partitionIds);
- List<BrokerFileGroup> fileGroups = Lists.newArrayList();
- fileGroups.add(brokerFileGroup);
- UUID uuid = UUID.randomUUID();
- TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
- Profile jobProfile = new Profile("test", false);
- LoadLoadingTask task = new LoadLoadingTask(database, olapTable,
brokerDesc, fileGroups, 100, 100, false, false, 100,
- callback, "", 100, 1, 1, true, jobProfile, false, false,
LoadTask.Priority.NORMAL);
- try {
- UserIdentity userInfo = new UserIdentity("root", "localhost");
- userInfo.setIsAnalyzed();
- task.init(loadId,
- attachment.getFileStatusByTable(aggKey),
- attachment.getFileNumByTable(aggKey),
- userInfo);
- LoadingTaskPlanner planner = Deencapsulation.getField(task,
"planner");
- Analyzer al = Deencapsulation.getField(planner, "analyzer");
- Assert.assertFalse(al.isUDFAllowed());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
@Test
public void testLoadingTaskOnFinishedWithUnfinishedTask(@Injectable
BrokerLoadingTaskAttachment attachment,
@Injectable
LoadTask loadTask1,
@@ -465,8 +412,8 @@ public class BrokerLoadJobTest {
@Test
public void testLoadingTaskOnFinished(@Injectable
BrokerLoadingTaskAttachment attachment1,
- @Injectable LoadTask loadTask1, @Mocked Env env, @Mocked
InternalCatalog catalog,
- @Injectable Database database) {
+ @Injectable LoadTask loadTask1,
@Mocked Env env, @Mocked InternalCatalog catalog,
+ @Injectable Database database) {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob();
Deencapsulation.setField(brokerLoadJob, "state", JobState.LOADING);
Map<Long, LoadTask> idToTasks = Maps.newHashMap();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]