This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 77b756f [Refactor] Remove redundant code of mini load and insert
(#4966)
77b756f is described below
commit 77b756fb87ce42986271d67080fb0cacadc8b903
Author: EmmyMiao87 <[email protected]>
AuthorDate: Wed Feb 3 22:19:20 2021 +0800
[Refactor] Remove redundant code of mini load and insert (#4966)
The content deleted by this PR includes mini load and insert in the old
framework.
The task and scheduling logic of old framework has been deleted.
---
.../java/org/apache/doris/catalog/Catalog.java | 7 -
.../src/main/java/org/apache/doris/load/Load.java | 43 ++--
.../java/org/apache/doris/load/LoadChecker.java | 12 --
.../java/org/apache/doris/planner/CsvScanNode.java | 208 ------------------
.../apache/doris/planner/DistributedPlanner.java | 13 +-
.../org/apache/doris/planner/OlapRewriteNode.java | 126 -----------
.../org/apache/doris/task/InsertLoadEtlTask.java | 38 ----
.../org/apache/doris/task/MiniLoadEtlTask.java | 198 -----------------
.../org/apache/doris/task/MiniLoadPendingTask.java | 216 -------------------
.../org/apache/doris/task/PullLoadEtlTask.java | 139 ------------
.../java/org/apache/doris/task/PullLoadJob.java | 110 ----------
.../java/org/apache/doris/task/PullLoadJobMgr.java | 200 -----------------
.../java/org/apache/doris/task/PullLoadTask.java | 237 ---------------------
.../org/apache/doris/task/PullLoadTaskPlanner.java | 160 --------------
14 files changed, 20 insertions(+), 1687 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 8715704..e9b643f 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -206,7 +206,6 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
-import org.apache.doris.task.PullLoadJobMgr;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@@ -367,7 +366,6 @@ public class Catalog {
private MetaReplayState metaReplayState;
- private PullLoadJobMgr pullLoadJobMgr;
private BrokerMgr brokerMgr;
private ResourceMgr resourceMgr;
@@ -520,7 +518,6 @@ public class Catalog {
this.isDefaultClusterCreated = false;
- this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog);
this.brokerMgr = new BrokerMgr();
this.resourceMgr = new ResourceMgr();
@@ -589,10 +586,6 @@ public class Catalog {
return SingletonHolder.INSTANCE;
}
- public PullLoadJobMgr getPullLoadJobMgr() {
- return pullLoadJobMgr;
- }
-
public BrokerMgr getBrokerMgr() {
return brokerMgr;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
index 9c4104f..315ba25 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java
@@ -595,7 +595,7 @@ public class Load {
* This is only used for hadoop load
*/
public static void checkAndCreateSource(Database db, DataDescription
dataDescription,
- Map<Long, Map<Long, List<Source>>> tableToPartitionSources,
EtlJobType jobType) throws DdlException {
+ Map<Long, Map<Long, List<Source>>>
tableToPartitionSources, EtlJobType jobType) throws DdlException {
Source source = new Source(dataDescription.getFilePaths());
long tableId = -1;
Set<Long> sourcePartitionIds = Sets.newHashSet();
@@ -779,7 +779,7 @@ public class Load {
Pair<String, List<String>> function = entry.getValue();
try {
DataDescription.validateMappingFunction(function.first, function.second,
columnNameMap,
- mappingColumn,
dataDescription.isHadoopLoad());
+ mappingColumn, dataDescription.isHadoopLoad());
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
@@ -883,7 +883,7 @@ public class Load {
* (A, B, C) SET (__doris_shadow_B = B)
*/
ImportColumnDesc importColumnDesc = new
ImportColumnDesc(column.getName(),
-
new SlotRef(null, originCol));
+ new SlotRef(null, originCol));
shadowColumnDescs.add(importColumnDesc);
}
} else {
@@ -918,7 +918,7 @@ public class Load {
Map<String, SlotDescriptor> slotDescByName,
TBrokerScanRangeParams params) throws UserException {
rewriteColumns(columnExprs);
initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName,
analyzer,
- srcTupleDesc, slotDescByName, params, true);
+ srcTupleDesc, slotDescByName, params, true);
}
/*
@@ -946,7 +946,7 @@ public class Load {
}
// check whether the OlapTable has sequenceCol
boolean hasSequenceCol = false;
- if (tbl instanceof OlapTable && ((OlapTable)tbl).hasSequenceCol()) {
+ if (tbl instanceof OlapTable && ((OlapTable) tbl).hasSequenceCol()) {
hasSequenceCol = true;
}
@@ -1771,12 +1771,7 @@ public class Load {
clearJob(job, srcState);
}
- if (job.getBrokerDesc() != null) {
- if (srcState == JobState.ETL) {
- // Cancel job id
-
Catalog.getCurrentCatalog().getPullLoadJobMgr().cancelJob(job.getId());
- }
- }
+ Preconditions.checkState(job.getBrokerDesc() == null);
LOG.info("cancel load job success. job: {}", job);
return true;
}
@@ -1925,14 +1920,14 @@ public class Load {
if (tableNames.isEmpty()) {
// forward compatibility
if
(!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(),
dbName,
-
PrivPredicate.LOAD)) {
+ PrivPredicate.LOAD)) {
continue;
}
} else {
boolean auth = true;
for (String tblName : tableNames) {
if
(!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(),
dbName,
-
tblName, PrivPredicate.LOAD)) {
+ tblName, PrivPredicate.LOAD)) {
auth = false;
break;
}
@@ -2004,8 +1999,8 @@ public class Load {
// task info
jobInfo.add("cluster:" + loadJob.getHadoopCluster()
- + "; timeout(s):" +
loadJob.getTimeoutSecond()
- + "; max_filter_ratio:" +
loadJob.getMaxFilterRatio());
+ + "; timeout(s):" + loadJob.getTimeoutSecond()
+ + "; max_filter_ratio:" + loadJob.getMaxFilterRatio());
// error msg
if (loadJob.getState() == JobState.CANCELLED) {
@@ -2330,7 +2325,7 @@ public class Load {
continue;
}
replica.updateVersionInfo(info.getVersion(),
info.getVersionHash(),
- info.getDataSize(),
info.getRowCount());
+ info.getDataSize(), info.getRowCount());
}
}
@@ -2349,7 +2344,7 @@ public class Load {
continue;
}
updatePartitionVersion(partition,
partitionLoadInfo.getVersion(),
-
partitionLoadInfo.getVersionHash(), jobId);
+ partitionLoadInfo.getVersionHash(), jobId);
// update table row count
for (MaterializedIndex materializedIndex :
partition.getMaterializedIndices(IndexExtState.ALL)) {
@@ -2446,7 +2441,7 @@ public class Load {
continue;
}
replica.updateVersionInfo(info.getVersion(),
info.getVersionHash(),
- info.getDataSize(),
info.getRowCount());
+ info.getDataSize(), info.getRowCount());
}
}
} else {
@@ -2652,7 +2647,7 @@ public class Load {
// hdfs://host:port/outputPath/dbId/loadLabel/
DppConfig dppConfig = job.getHadoopDppConfig();
String outputPath =
DppScheduler.getEtlOutputPath(dppConfig.getFsDefaultName(),
-
dppConfig.getOutputPath(), job.getDbId(), job.getLabel(), "");
+ dppConfig.getOutputPath(), job.getDbId(),
job.getLabel(), "");
try {
dppScheduler.deleteEtlOutputPath(outputPath);
} catch (Exception e) {
@@ -2696,7 +2691,7 @@ public class Load {
}
public boolean updateLoadJobState(LoadJob job, JobState destState,
CancelType cancelType, String msg,
- List<String> failedMsg) {
+ List<String> failedMsg) {
boolean result = true;
JobState srcState = null;
@@ -2855,7 +2850,7 @@ public class Load {
}
updatePartitionVersion(partition,
partitionLoadInfo.getVersion(),
- partitionLoadInfo.getVersionHash(),
jobId);
+ partitionLoadInfo.getVersionHash(), jobId);
for (MaterializedIndex materializedIndex :
partition.getMaterializedIndices(IndexExtState.ALL)) {
long tableRowCount = 0L;
@@ -2889,7 +2884,7 @@ public class Load {
long partitionId = partition.getId();
partition.updateVisibleVersionAndVersionHash(version, versionHash);
LOG.info("update partition version success. version: {}, version hash:
{}, job id: {}, partition id: {}",
- version, versionHash, jobId, partitionId);
+ version, versionHash, jobId, partitionId);
}
private boolean processCancelled(LoadJob job, CancelType cancelType,
String msg, List<String> failedMsg) {
@@ -2961,8 +2956,8 @@ public class Load {
if (srcState == JobState.LOADING || srcState ==
JobState.QUORUM_FINISHED) {
for (PushTask pushTask : job.getPushTasks()) {
AgentTaskQueue.removePushTask(pushTask.getBackendId(),
pushTask.getSignature(),
- pushTask.getVersion(),
pushTask.getVersionHash(),
- pushTask.getPushType(),
pushTask.getTaskType());
+ pushTask.getVersion(), pushTask.getVersionHash(),
+ pushTask.getPushType(), pushTask.getTaskType());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
index 2cbcd10..0b8d847 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java
@@ -39,11 +39,8 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.HadoopLoadEtlTask;
import org.apache.doris.task.HadoopLoadPendingTask;
-import org.apache.doris.task.InsertLoadEtlTask;
import org.apache.doris.task.MasterTask;
import org.apache.doris.task.MasterTaskExecutor;
-import org.apache.doris.task.MiniLoadEtlTask;
-import org.apache.doris.task.MiniLoadPendingTask;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
@@ -172,9 +169,6 @@ public class LoadChecker extends MasterDaemon {
case HADOOP:
task = new HadoopLoadPendingTask(job);
break;
- case MINI:
- task = new MiniLoadPendingTask(job);
- break;
default:
LOG.warn("unknown etl job type. type: {}",
etlJobType.name());
break;
@@ -200,12 +194,6 @@ public class LoadChecker extends MasterDaemon {
case HADOOP:
task = new HadoopLoadEtlTask(job);
break;
- case MINI:
- task = new MiniLoadEtlTask(job);
- break;
- case INSERT:
- task = new InsertLoadEtlTask(job);
- break;
default:
LOG.warn("unknown etl job type. type: {}",
etlJobType.name());
break;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java
deleted file mode 100644
index 04aab41..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java
+++ /dev/null
@@ -1,208 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.load.PartitionLoadInfo;
-import org.apache.doris.load.Source;
-import org.apache.doris.load.TableLoadInfo;
-import org.apache.doris.thrift.TColumnType;
-import org.apache.doris.thrift.TCsvScanNode;
-import org.apache.doris.thrift.TMiniLoadEtlFunction;
-import org.apache.doris.thrift.TPlanNode;
-import org.apache.doris.thrift.TPlanNodeType;
-import org.apache.doris.thrift.TScanRangeLocations;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.MoreObjects.ToStringHelper;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-public class CsvScanNode extends ScanNode {
- private static final Logger LOG = LogManager.getLogger(CsvScanNode.class);
-
- private final OlapTable table;
- private final LoadJob job;
-
- private List<String> filePaths = Lists.newArrayList();
-
- private String columnSeparator;
- private String lineDelimiter;
-
- private List<String> columns = Lists.newArrayList();
- private List<String> unspecifiedColumns = Lists.newArrayList();
- private List<String> defaultValues = Lists.newArrayList();
-
- private Map<String, TColumnType> columnTypeMapping = Maps.newHashMap();
- private Map<String, TMiniLoadEtlFunction> columnToFunction =
Maps.newHashMap();
-
- private double maxFilterRatio = 0.0;
-
- public CsvScanNode(PlanNodeId id, TupleDescriptor desc, OlapTable table,
LoadJob job) {
- super(id, desc, "Scan CSV");
- this.table = table;
- this.job = job;
- }
-
- @Override
- protected void toThrift(TPlanNode msg) {
- msg.node_type = TPlanNodeType.CSV_SCAN_NODE;
- msg.csv_scan_node = new TCsvScanNode(desc.getId().asInt(), filePaths);
-
- if (!Strings.isNullOrEmpty(columnSeparator)) {
- msg.csv_scan_node.setColumnSeparator(columnSeparator);
- }
- if (!Strings.isNullOrEmpty(lineDelimiter)) {
- msg.csv_scan_node.setLineDelimiter(lineDelimiter);
- }
-
- if (!columns.isEmpty()) {
- msg.csv_scan_node.setColumns(columns);
- }
- if (!unspecifiedColumns.isEmpty()) {
- msg.csv_scan_node.setUnspecifiedColumns(unspecifiedColumns);
- }
- if (!defaultValues.isEmpty()) {
- msg.csv_scan_node.setDefaultValues(defaultValues);
- }
-
- if (!columnToFunction.isEmpty()) {
- msg.csv_scan_node.setColumnFunctionMapping(columnToFunction);
- }
- msg.csv_scan_node.setColumnTypeMapping(columnTypeMapping);
- msg.csv_scan_node.setMaxFilterRatio(maxFilterRatio);
- msg.csv_scan_node.setColumnSeparator(columnSeparator);
- }
-
- @Override
- public void finalize(Analyzer analyzer) throws UserException {
- // get file paths
- // file paths in different partitions are same in mini load
- TableLoadInfo tableLoadInfo = job.getTableLoadInfo(table.getId());
- Collection<PartitionLoadInfo> partitionLoadInfos =
tableLoadInfo.getIdToPartitionLoadInfo().values();
- Preconditions.checkState(!partitionLoadInfos.isEmpty());
- PartitionLoadInfo partitionLoadInfo = (PartitionLoadInfo)
partitionLoadInfos.toArray()[0];
- List<Source> sources = partitionLoadInfo.getSources();
- Preconditions.checkState(sources.size() == 1);
- Source source = sources.get(0);
- filePaths = source.getFileUrls();
-
- // column separator
- columnSeparator = source.getColumnSeparator();
-
- // line delimiter
- lineDelimiter = source.getLineDelimiter();
-
- // construct columns (specified & unspecified) and default-values
- List<String> columnNames = Lists.newArrayList();
- for (Column column : table.getBaseSchema()) {
- columnNames.add(column.getName());
- }
- columns = source.getColumnNames();
- if (columns.isEmpty()) {
- columns = columnNames;
- }
- for (String columnName : columns) {
- if (!columnNames.contains(columnName)) {
- LOG.info("Column [{}] is not exist in table schema, will be
ignored.", columnName);
- }
- }
- for (String columnName : columnNames) {
- Column column = table.getColumn(columnName);
- columnTypeMapping.put(columnName,
column.getOriginType().toColumnTypeThrift());
-
- if (columns.contains(columnName)) {
- continue;
- }
- unspecifiedColumns.add(columnName);
- String defaultValue = column.getDefaultValue();
- if (defaultValue == null && false == column.isAllowNull()) {
- throw new UserException(
- "Column [" + columnName + "] should be specified. "
- + "only columns have default values can be
omitted");
- }
- if (true == column.isAllowNull() && null == defaultValue) {
- defaultValues.add("\\N");
- } else {
- defaultValues.add(defaultValue);
- }
- }
-
- Map<String, Pair<String, List<String>>> functions =
source.getColumnToFunction();
- for (String key : functions.keySet()) {
- final Pair<String, List<String>> pair = functions.get(key);
- TMiniLoadEtlFunction function = new TMiniLoadEtlFunction();
- int paramColumnIndex = -1;
- for (String str : pair.second) {
- boolean find = false;
- for (int i = 0; i < columns.size(); i++) {
- if (str.equals(columns.get(i))) {
- paramColumnIndex = i;
- find = true;
- break;
- }
- }
- if (find) {
- function.setFunctionName(pair.first);
- function.setParamColumnIndex(paramColumnIndex);
- columnToFunction.put(key, function);
- break;
- }
- }
- }
- // max filter ratio
- // TODO: remove!!
- maxFilterRatio = job.getMaxFilterRatio();
- }
-
- @Override
- protected String debugString() {
- ToStringHelper helper = MoreObjects.toStringHelper(this);
- return helper.addValue(super.debugString()).toString();
- }
-
- /**
- * like Mysql, We query Meta to get request's data location
- * extra result info will pass to backend ScanNode
- */
- @Override
- public List<TScanRangeLocations> getScanRangeLocations(long
maxScanRangeLength) {
- return null;
- }
-
- @Override
- public int getNumInstances() {
- return 1;
- }
-}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 16caf11..213cce2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -205,9 +205,7 @@ public class DistributedPlanner {
childFragments.get(0));
} else if (root instanceof SelectNode) {
result = createSelectNodeFragment((SelectNode) root,
childFragments);
- } else if (root instanceof OlapRewriteNode) {
- result = createOlapRewriteNodeFragment((OlapRewriteNode) root,
childFragments);
- } else if (root instanceof SetOperationNode) {
+ } else if (root instanceof SetOperationNode) {
result = createSetOperationNodeFragment((SetOperationNode) root,
childFragments, fragments);
} else if (root instanceof MergeNode) {
result = createMergeNodeFragment((MergeNode) root, childFragments,
fragments);
@@ -838,15 +836,6 @@ public class DistributedPlanner {
return childFragment;
}
- private PlanFragment createOlapRewriteNodeFragment(
- OlapRewriteNode olapRewriteNode, ArrayList<PlanFragment>
childFragments) {
- Preconditions.checkState(olapRewriteNode.getChildren().size() ==
childFragments.size());
- PlanFragment childFragment = childFragments.get(0);
- olapRewriteNode.setChild(0, childFragment.getPlanRoot());
- childFragment.setPlanRoot(olapRewriteNode);
- return childFragment;
- }
-
/**
* Replace node's child at index childIdx with an ExchangeNode that
receives its input from childFragment.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java
deleted file mode 100644
index 95dc1b2..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java
+++ /dev/null
@@ -1,126 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.planner;
-
-import com.google.common.collect.Lists;
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.ExprSubstitutionMap;
-import org.apache.doris.analysis.InsertStmt;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.Table;
-import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TExplainLevel;
-import org.apache.doris.thrift.TOlapRewriteNode;
-import org.apache.doris.thrift.TPlanNode;
-import org.apache.doris.thrift.TPlanNodeType;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-
-// Used to convert column to valid OLAP table
-public class OlapRewriteNode extends PlanNode {
- private static final Logger LOG =
LogManager.getLogger(OlapRewriteNode.class);
-
- private InsertStmt insertStmt;
-
- private Table table;
- private TupleDescriptor tupleDescriptor;
- private List<Expr> newResultExprs;
-
- public OlapRewriteNode(PlanNodeId id, PlanNode child, InsertStmt
insertStmt) {
- super(id, insertStmt.getOlapTuple().getId().asList(), "OLAP REWRITE
NODE");
- addChild(child);
-
- this.table = insertStmt.getTargetTable();
- this.tupleDescriptor = insertStmt.getOlapTuple();
- this.insertStmt = insertStmt;
- }
-
- public OlapRewriteNode(PlanNodeId id, PlanNode child,
- Table table,
- TupleDescriptor tupleDescriptor,
- List<Expr> slotRefs) {
- super(id, child.getTupleIds(), "OLAP REWRITE NODE");
- addChild(child);
- this.table = table;
- this.tupleDescriptor = tupleDescriptor;
- this.newResultExprs = slotRefs;
- }
-
- @Override
- public void init(Analyzer analyzer) throws UserException {
- assignConjuncts(analyzer);
-
- // Set smap to the combined children's smaps and apply that to all
conjuncts_.
- createDefaultSmap(analyzer);
-
- computeStats(analyzer);
- // assignedConjuncts = analyzr.getAssignedConjuncts();
-
- if (insertStmt != null) {
- ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
- newResultExprs = Lists.newArrayList();
- for (Expr expr : insertStmt.getResultExprs()) {
- newResultExprs.add(expr.clone(combinedChildSmap));
- }
- }
- }
-
- @Override
- protected void toThrift(TPlanNode msg) {
- msg.node_type = TPlanNodeType.OLAP_REWRITE_NODE;
- TOlapRewriteNode tnode = new TOlapRewriteNode();
- for (Column column : table.getBaseSchema()) {
-
tnode.addToColumnTypes(column.getOriginType().toColumnTypeThrift());
- }
- for (Expr expr : newResultExprs) {
- tnode.addToColumns(expr.treeToThrift());
- }
- tnode.setOutputTupleId(tupleDescriptor.getId().asInt());
- msg.setOlapRewriteNode(tnode);
- }
-
- @Override
- public void computeStats(Analyzer analyzer) {
- super.computeStats(analyzer);
- long cardinality = getChild(0).cardinality;
- double selectivity = computeSelectivity();
- if (cardinality < 0 || selectivity < 0) {
- this.cardinality = -1;
- } else {
- this.cardinality = Math.round(cardinality * selectivity);
- }
- }
-
- @Override
- protected String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
- StringBuilder output = new StringBuilder();
- if (!conjuncts.isEmpty()) {
- output.append(prefix + "predicates: " +
getExplainString(conjuncts) + "\n");
- }
- return output.toString();
- }
-
- @Override
- public int getNumInstances() {
- return children.get(0).getNumInstances();
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java
deleted file mode 100644
index 8a85a9c..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java
+++ /dev/null
@@ -1,38 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.common.LoadException;
-import org.apache.doris.load.LoadJob;
-
-public class InsertLoadEtlTask extends MiniLoadEtlTask {
-
- public InsertLoadEtlTask(LoadJob job) {
- super(job);
- }
-
- @Override
- protected boolean updateJobEtlStatus() {
- return true;
- }
-
- @Override
- protected void processEtlRunning() throws LoadException {
- throw new LoadException("not implement");
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java
deleted file mode 100644
index 9d17432..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java
+++ /dev/null
@@ -1,198 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.LoadException;
-import org.apache.doris.common.Pair;
-import org.apache.doris.load.MiniEtlTaskInfo;
-import org.apache.doris.load.EtlStatus;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
-import org.apache.doris.thrift.TEtlState;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
-
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
-
-import java.util.Map;
-
-public class MiniLoadEtlTask extends LoadEtlTask {
- private static final Logger LOG =
LogManager.getLogger(MiniLoadEtlTask.class);
-
- public MiniLoadEtlTask(LoadJob job) {
- super(job);
- }
-
- @Override
- protected boolean updateJobEtlStatus() {
- // update etl tasks status
- if (job.miniNeedGetTaskStatus()) {
- LOG.debug("get mini etl task status actively. job: {}", job);
- for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
- TEtlState etlState = taskInfo.getTaskStatus().getState();
- if (etlState == TEtlState.RUNNING) {
- updateEtlTaskStatus(taskInfo);
- }
- }
- }
-
- // update etl job status
- updateEtlJobStatus();
- return true;
- }
-
- private boolean updateEtlTaskStatus(MiniEtlTaskInfo taskInfo) {
- // get etl status
- TMiniLoadEtlStatusResult result =
getMiniLoadEtlStatus(taskInfo.getBackendId(), taskInfo.getId());
- LOG.info("mini load etl status: {}, job: {}", result, job);
- if (result == null) {
- return false;
- }
- TStatus tStatus = result.getStatus();
- if (tStatus.getStatusCode() != TStatusCode.OK) {
- LOG.warn("get buck load etl status fail. msg: {}, job: {}",
tStatus.getErrorMsgs(), job);
- return false;
- }
-
- // update etl task status
- EtlStatus taskStatus = taskInfo.getTaskStatus();
- if (taskStatus.setState(result.getEtlState())) {
- if (result.isSetCounters()) {
- taskStatus.setCounters(result.getCounters());
- }
- if (result.isSetTrackingUrl()) {
- taskStatus.setTrackingUrl(result.getTrackingUrl());
- }
- if (result.isSetFileMap()) {
- taskStatus.setFileMap(result.getFileMap());
- }
- }
-
- return true;
- }
-
- private TMiniLoadEtlStatusResult getMiniLoadEtlStatus(long backendId, long
taskId) {
- TMiniLoadEtlStatusResult result = null;
- Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
- if (backend == null || !backend.isAlive()) {
- String failMsg = "backend is null or is not alive";
- LOG.error(failMsg);
- return result;
- }
-
- AgentClient client = new AgentClient(backend.getHost(),
backend.getBePort());
- return client.getEtlStatus(job.getId(), taskId);
- }
-
- private boolean updateEtlJobStatus() {
- boolean hasCancelledTask = false;
- boolean hasRunningTask = false;
- long normalNum = 0;
- long abnormalNum = 0;
- Map<String, Long> fileMap = Maps.newHashMap();
- String trackingUrl = EtlStatus.DEFAULT_TRACKING_URL;
-
- EtlStatus etlJobStatus = job.getEtlJobStatus();
- for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
- EtlStatus taskStatus = taskInfo.getTaskStatus();
- switch (taskStatus.getState()) {
- case RUNNING:
- hasRunningTask = true;
- break;
- case CANCELLED:
- hasCancelledTask = true;
- break;
- case FINISHED:
- // counters and file list
- Map<String, String> counters = taskStatus.getCounters();
- if (counters.containsKey(DPP_NORMAL_ALL)) {
- normalNum +=
Long.parseLong(counters.get(DPP_NORMAL_ALL));
- }
- if (counters.containsKey(DPP_ABNORMAL_ALL)) {
- abnormalNum +=
Long.parseLong(counters.get(DPP_ABNORMAL_ALL));
- }
- fileMap.putAll(taskStatus.getFileMap());
- if
(!taskStatus.getTrackingUrl().equals(EtlStatus.DEFAULT_TRACKING_URL)) {
- trackingUrl = taskStatus.getTrackingUrl();
- }
- break;
- default:
- break;
- }
- }
-
- if (hasCancelledTask) {
- etlJobStatus.setState(TEtlState.CANCELLED);
- } else if (hasRunningTask) {
- etlJobStatus.setState(TEtlState.RUNNING);
- } else {
- etlJobStatus.setState(TEtlState.FINISHED);
- Map<String, String> counters = Maps.newHashMap();
- counters.put(DPP_NORMAL_ALL, String.valueOf(normalNum));
- counters.put(DPP_ABNORMAL_ALL, String.valueOf(abnormalNum));
- etlJobStatus.setCounters(counters);
- etlJobStatus.setFileMap(fileMap);
- etlJobStatus.setTrackingUrl(trackingUrl);
- }
-
- return true;
- }
-
- @Override
- protected void processEtlRunning() throws LoadException {
- // update mini etl job progress
- int finishedTaskNum = 0;
- Map<Long, MiniEtlTaskInfo> idToEtlTask = job.getMiniEtlTasks();
- for (MiniEtlTaskInfo taskInfo : idToEtlTask.values()) {
- EtlStatus taskStatus = taskInfo.getTaskStatus();
- if (taskStatus.getState() == TEtlState.FINISHED) {
- ++finishedTaskNum;
- }
- }
-
- int progress = (int) (finishedTaskNum * 100 / idToEtlTask.size());
- if (progress >= 100) {
- // set progress to 100 when status is FINISHED
- progress = 99;
- }
-
- job.setProgress(progress);
- }
-
- @Override
- protected Map<String, Pair<String, Long>> getFilePathMap() throws
LoadException {
- Map<String, Long> fileMap = job.getEtlJobStatus().getFileMap();
- if (fileMap == null) {
- throw new LoadException("get etl files error");
- }
-
- Map<String, Pair<String, Long>> filePathMap = Maps.newHashMap();
- for (Map.Entry<String, Long> entry : fileMap.entrySet()) {
- String partitionIndexBucket =
getPartitionIndexBucketString(entry.getKey());
- // http://host:8000/data/dir/file
- filePathMap.put(partitionIndexBucket, Pair.create(entry.getKey(),
entry.getValue()));
- }
-
- return filePathMap;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java
deleted file mode 100644
index f35226b..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java
+++ /dev/null
@@ -1,216 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.LoadException;
-import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
-import org.apache.doris.load.EtlSubmitResult;
-import org.apache.doris.load.LoadErrorHub;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.load.MiniEtlTaskInfo;
-import org.apache.doris.load.TableLoadInfo;
-import org.apache.doris.planner.CsvScanNode;
-import org.apache.doris.planner.DataPartition;
-import org.apache.doris.planner.DataSplitSink;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.planner.ScanNode;
-import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.PaloInternalServiceVersion;
-import org.apache.doris.thrift.TAgentResult;
-import org.apache.doris.thrift.TAgentServiceVersion;
-import org.apache.doris.thrift.TExecPlanFragmentParams;
-import org.apache.doris.thrift.TLoadErrorHubInfo;
-import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
-import org.apache.doris.thrift.TPlanFragmentExecParams;
-import org.apache.doris.thrift.TQueryOptions;
-import org.apache.doris.thrift.TQueryType;
-import org.apache.doris.thrift.TStatus;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.thrift.TUniqueId;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-
-public class MiniLoadPendingTask extends LoadPendingTask {
- private static final Logger LOG =
LogManager.getLogger(MiniLoadPendingTask.class);
-
- // descriptor used to register all column and table need
- private final DescriptorTable desc;
-
- // destination Db and table get from request
- // Data will load to this table
- private OlapTable destTable;
-
- // dest desc
- private TupleDescriptor destTupleDesc;
-
- private DataSplitSink tableSink;
-
- private List<Pair<Long, TMiniLoadEtlTaskRequest>> requests;
-
- public MiniLoadPendingTask(LoadJob job) {
- super(job);
- this.desc = new DescriptorTable();
- }
-
- @Override
- protected void createEtlRequest() throws Exception {
- requests = Lists.newArrayList();
- for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
- long taskId = taskInfo.getId();
- long backendId = taskInfo.getBackendId();
- long tableId = taskInfo.getTableId();
-
- // All the following operation will process when destTable's read
lock are hold.
- destTable = (OlapTable) db.getTable(tableId);
- if (destTable == null) {
- throw new LoadException("table does not exist. id: " +
tableId);
- }
-
- destTable.readLock();
- try {
- registerToDesc();
- tableSink = new DataSplitSink(destTable, destTupleDesc);
-
- // add schema hash to table load info
- TableLoadInfo tableLoadInfo =
job.getTableLoadInfo(destTable.getId());
- for (Map.Entry<Long, Integer> entry :
destTable.getIndexIdToSchemaHash().entrySet()) {
- tableLoadInfo.addIndexSchemaHash(entry.getKey(),
entry.getValue());
- }
- requests.add(new Pair<Long,
TMiniLoadEtlTaskRequest>(backendId, createRequest(taskId)));
- } finally {
- destTable.readUnlock();
- }
- }
- }
-
- @Override
- protected EtlSubmitResult submitEtlJob(int retry) {
- LOG.info("begin submit mini load etl job: {}", job);
-
- for (Pair<Long, TMiniLoadEtlTaskRequest> pair : requests) {
- long backendId = pair.first;
- TMiniLoadEtlTaskRequest request = pair.second;
-
- Backend backend =
Catalog.getCurrentSystemInfo().getBackend(backendId);
- if
(!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
- String failMsg = "backend is null or is not alive";
- LOG.error(failMsg);
- TStatus tStatus = new TStatus(TStatusCode.CANCELLED);
- tStatus.setErrorMsgs(Lists.newArrayList(failMsg));
- return new EtlSubmitResult(tStatus, null);
- }
-
- AgentClient client = new AgentClient(backend.getHost(),
backend.getBePort());
- TAgentResult submitResult = client.submitEtlTask(request);
- if (submitResult.getStatus().getStatusCode() != TStatusCode.OK) {
- return new EtlSubmitResult(submitResult.getStatus(), null);
- }
- }
-
- return new EtlSubmitResult(new TStatus(TStatusCode.OK), null);
- }
-
- private void registerToDesc() {
- destTupleDesc = desc.createTupleDescriptor();
- destTupleDesc.setTable(destTable);
- // Lock database and get its schema hash??
- // Make sure that import job has its corresponding schema
- for (Column col : destTable.getBaseSchema()) {
- SlotDescriptor slot = desc.addSlotDescriptor(destTupleDesc);
- // All this slot is needed
- slot.setIsMaterialized(true);
- slot.setColumn(col);
- if (true == col.isAllowNull()) {
- slot.setIsNullable(true);
- } else {
- slot.setIsNullable(false);
- }
- }
- }
-
- private TMiniLoadEtlTaskRequest createRequest(long taskId) throws
LoadException {
- ScanNode csvScanNode = new CsvScanNode(new PlanNodeId(0),
destTupleDesc, destTable, job);
- desc.computeMemLayout();
- try {
- csvScanNode.finalize(null);
- } catch (UserException e) {
- LOG.warn("csvScanNode finalize failed[err={}]", e);
- throw new LoadException("CSV scan finalize failed.", e);
- }
- PlanFragment fragment = new PlanFragment(new PlanFragmentId(0),
csvScanNode, DataPartition.UNPARTITIONED);
- fragment.setSink(tableSink);
-
- try {
- fragment.finalize(null, false);
- } catch (Exception e) {
- LOG.info("fragment finalize failed.e = {}", e);
- throw new LoadException("Fragment finalize failed.", e);
- }
-
- TMiniLoadEtlTaskRequest request = new TMiniLoadEtlTaskRequest();
- request.setProtocolVersion(TAgentServiceVersion.V1);
- TExecPlanFragmentParams params = new TExecPlanFragmentParams();
- params.setProtocolVersion(PaloInternalServiceVersion.V1);
- params.setFragment(fragment.toThrift());
- params.setDescTbl(desc.toThrift());
- params.setImportLabel(job.getLabel());
- params.setDbName(db.getFullName());
- params.setLoadJobId(job.getId());
-
- LoadErrorHub.Param param = load.getLoadErrorHubInfo();
- if (param != null) {
- TLoadErrorHubInfo info = param.toThrift();
- if (info != null) {
- params.setLoadErrorHubInfo(info);
- }
- }
-
- TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
- // Only use fragment id
- TUniqueId uniqueId = new TUniqueId(job.getId(), taskId);
- execParams.setQueryId(new TUniqueId(uniqueId));
- execParams.setFragmentInstanceId(uniqueId);
- execParams.per_node_scan_ranges = Maps.newHashMap();
- execParams.per_exch_num_senders = Maps.newHashMap();
- execParams.destinations = Lists.newArrayList();
- params.setParams(execParams);
- TQueryOptions queryOptions = new TQueryOptions();
- queryOptions.setQueryType(TQueryType.LOAD);
- params.setQueryOptions(queryOptions);
- request.setParams(params);
- return request;
- }
-
-}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java
deleted file mode 100644
index 154134c..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java
+++ /dev/null
@@ -1,139 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.LoadException;
-import org.apache.doris.common.Pair;
-import org.apache.doris.load.EtlStatus;
-import org.apache.doris.load.LoadJob;
-import org.apache.doris.thrift.TEtlState;
-
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Map;
-
-// Used to process pull load etl task
-@Deprecated
-public class PullLoadEtlTask extends LoadEtlTask {
- private static final Logger LOG =
LogManager.getLogger(PullLoadEtlTask.class);
- private PullLoadJobMgr mgr;
-
- public PullLoadEtlTask(LoadJob job) {
- super(job);
- mgr = Catalog.getCurrentCatalog().getPullLoadJobMgr();
- }
-
- @Override
- protected String getErrorMsg() {
- String errMsg = null;
- PullLoadJob pullLoadJob = mgr.getJob(job.getId());
- if (pullLoadJob != null) {
- PullLoadTask failureTask = pullLoadJob.getFailureTask();
- if (failureTask != null) {
- if (failureTask.getExecuteStatus() != null) {
- errMsg = "Broker etl failed: " +
failureTask.getExecuteStatus().getErrorMsg();
- }
- }
- }
- return errMsg != null ? errMsg : super.getErrorMsg();
- }
-
- @Override
- protected boolean updateJobEtlStatus() {
- PullLoadJob pullLoadJob = mgr.getJob(job.getId());
- EtlStatus etlStatus = job.getEtlJobStatus();
- if (pullLoadJob == null) {
- LOG.warn("pullLoadJob is null. JobId is {}", job.getId());
- return false;
- }
- switch (pullLoadJob.getState()) {
- case CANCELED:
- case FAILED:
- etlStatus.setState(TEtlState.CANCELLED);
- break;
- case FINISHED:
- updateFinishInfo(pullLoadJob);
- etlStatus.setState(TEtlState.FINISHED);
- break;
- case RUNNING:
- etlStatus.setState(TEtlState.RUNNING);
- break;
- default:
- etlStatus.setState(TEtlState.UNKNOWN);
- break;
- }
- return true;
- }
-
- private void updateFinishInfo(PullLoadJob pullLoadJob) {
- Map<String, Long> fileMap = Maps.newHashMap();
- long numRowsNormal = 0;
- long numRowsAbnormal = 0;
- String trackingUrl = null;
- for (PullLoadTask task : pullLoadJob.tasks) {
- fileMap.putAll(task.getFileMap());
-
- String value = task.getCounters().get(DPP_NORMAL_ALL);
- if (value != null) {
- numRowsNormal += Long.valueOf(value);
- }
- value = task.getCounters().get(DPP_ABNORMAL_ALL);
- if (value != null) {
- numRowsAbnormal += Long.valueOf(value);
- }
- if (trackingUrl == null && task.getTrackingUrl() != null) {
- trackingUrl = task.getTrackingUrl();
- }
- }
- Map<String, String> counters = Maps.newHashMap();
- counters.put(DPP_NORMAL_ALL, "" + numRowsNormal);
- counters.put(DPP_ABNORMAL_ALL, "" + numRowsAbnormal);
-
- EtlStatus etlJobStatus = job.getEtlJobStatus();
- etlJobStatus.setFileMap(fileMap);
- etlJobStatus.setCounters(counters);
- if (trackingUrl != null) {
- etlJobStatus.setTrackingUrl(trackingUrl);
- }
- }
-
- @Override
- protected void processEtlRunning() throws LoadException {
- }
-
- @Override
- protected Map<String, Pair<String, Long>> getFilePathMap() throws
LoadException {
- Map<String, Long> fileMap = job.getEtlJobStatus().getFileMap();
- if (fileMap == null) {
- throw new LoadException("get etl files error");
- }
-
- Map<String, Pair<String, Long>> filePathMap = Maps.newHashMap();
- for (Map.Entry<String, Long> entry : fileMap.entrySet()) {
- String partitionIndexBucket =
getPartitionIndexBucketString(entry.getKey());
- // http://host:8000/data/dir/file
- filePathMap.put(partitionIndexBucket, Pair.create(entry.getKey(),
entry.getValue()));
- }
-
- return filePathMap;
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java
deleted file mode 100644
index fa2e520..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java
+++ /dev/null
@@ -1,110 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.load.LoadJob;
-
-import com.google.common.collect.Sets;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Set;
-
-// One pull load job
-@Deprecated
-public class PullLoadJob {
- private static final Logger LOG = LogManager.getLogger(PullLoadTask.class);
-
- public enum State {
- RUNNING,
- FINISHED,
- CANCELED,
- FAILED,
- UNKNOWN;
-
- public boolean isRunning() {
- return this == RUNNING;
- }
- }
-
- // Input params
- public final LoadJob job;
- public final List<PullLoadTask> tasks;
-
- public Set<Integer> finishedTask;
-
- // Used to
- public volatile State state;
- // Only used when this job has failed.
- public PullLoadTask failureTask;
-
- public PullLoadJob(LoadJob job, List<PullLoadTask> tasks) {
- this.job = job;
- this.tasks = tasks;
- finishedTask = Sets.newHashSet();
- state = State.RUNNING;
- }
-
- public long getId() {
- return job.getId();
- }
-
- public synchronized State getState() {
- return state;
- }
-
- public synchronized boolean isRunning() {
- return state.isRunning();
- }
-
- public synchronized void cancel() {
- state = State.CANCELED;
- for (PullLoadTask task : tasks) {
- task.cancel();
- }
- }
-
- public PullLoadTask getFailureTask() {
- return failureTask;
- }
-
- public synchronized void onTaskFinished(PullLoadTask task) {
- int taskId = task.taskId;
- if (!state.isRunning()) {
- LOG.info("Ignore task info after this job has been stable.
taskId={}:{}", task.jobId, taskId);
- return;
- }
- if (!finishedTask.add(taskId)) {
- LOG.info("Receive duplicate task information. taskId={}:{}",
task.jobId, taskId);
- }
- if (finishedTask.size() == tasks.size()) {
- state = State.FINISHED;
- }
- }
-
- public synchronized void onTaskFailed(PullLoadTask task) {
- if (!state.isRunning()) {
- LOG.info("Ignore task info after this job has been stable.
taskId={}:{}", task.jobId, task.taskId);
- return;
- }
- state = State.FAILED;
- failureTask = task;
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java
deleted file mode 100644
index 50a8068..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java
+++ /dev/null
@@ -1,200 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.common.Status;
-import org.apache.doris.common.ThreadPoolManager;
-import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TStatusCode;
-
-import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.ReentrantLock;
-
-@Deprecated
-public class PullLoadJobMgr {
- private static final Logger LOG =
LogManager.getLogger(PullLoadJobMgr.class);
-
- // Lock protect
- private final ReentrantLock lock = new ReentrantLock();
- private final Map<Long, PullLoadJob> idToJobs = Maps.newHashMap();
-
- private final BlockingQueue<PullLoadTask> pendingTasks =
Queues.newLinkedBlockingQueue();
- private ExecutorService executorService;
-
- private int concurrency = 10;
-
- public PullLoadJobMgr(boolean needRegisterMetric) {
- executorService =
ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr",
needRegisterMetric);
- }
-
- /**
- * Start Task manager to work.
- * First it will ask all backends to collect task status
- * After collected, this will start scheduler to work.
- */
- public void start() {
- for (int i = 0; i < concurrency; ++i) {
- executorService.submit(new TaskExecutor());
- }
- }
-
- /**
- * Submit a load job.
- * This is called when a job turn from pending to ETL.
- * Or this can be called before 'start' called.
- */
- public void submit(PullLoadJob job) {
- lock.lock();
- try {
- if (idToJobs.containsKey(job.getId())) {
- // Same job id contains
- return;
- }
- idToJobs.put(job.getId(), job);
- for (PullLoadTask task : job.tasks) {
- pendingTasks.add(task);
- }
- } finally {
- lock.unlock();
- }
- }
-
- public PullLoadJob.State getJobState(long jobId) {
- lock.lock();
- try {
- PullLoadJob ctx = idToJobs.get(jobId);
- if (ctx == null) {
- return PullLoadJob.State.UNKNOWN;
- }
- return ctx.state;
- } finally {
- lock.unlock();
- }
- }
-
- // NOTE:
- // Must call this after job's state is not running
- // Otherwise, the returned object will be process concurrently,
- // this could be lead to an invalid situation
- public PullLoadJob getJob(long jobId) {
- lock.lock();
- try {
- return idToJobs.get(jobId);
- } finally {
- lock.unlock();
- }
- }
-
- // Cancel one job, remove all its tasks
- public void cancelJob(long jobId) {
- PullLoadJob job = null;
- lock.lock();
- try {
- job = idToJobs.remove(jobId);
- } finally {
- lock.unlock();
- }
- // Cancel job out of lock guard
- if (job != null) {
- job.cancel();
- }
- }
-
- // Only used when master replay job
- public void remove(long jobId) {
- lock.lock();
- try {
- idToJobs.remove(jobId);
- } finally {
- lock.unlock();
- }
- }
-
- public boolean isFailureCanRetry(Status status) {
- return true;
- }
-
- public class TaskExecutor implements Runnable {
-
- private void processOneTask(PullLoadTask task, PullLoadJob job) throws
UserException {
- int retryTime = 3;
- for (int i = 0; i < retryTime; ++i) {
- if (!job.isRunning()) {
- throw new UserException("Job has been cancelled.");
- }
- task.executeOnce();
- if (task.isFinished()) {
- return;
- } else {
- boolean needRetry =
isFailureCanRetry(task.getExecuteStatus());
- if (!needRetry) {
- break;
- }
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
-
- }
- }
- }
- }
-
- @Override
- public void run() {
- while (true) {
- PullLoadTask task;
- PullLoadJob job;
- try {
- task = pendingTasks.take();
- if (task == null) {
- continue;
- }
- job = getJob(task.jobId);
- if (job == null || !job.isRunning()) {
- LOG.info("Job is not running now. taskId={}:{}",
task.jobId, task.taskId);
- continue;
- }
- } catch (InterruptedException e) {
- LOG.info("Interrupted when take task.");
- continue;
- }
- try {
- processOneTask(task, job);
- if (task.isFinished()) {
- job.onTaskFinished(task);
- } else {
- job.onTaskFailed(task);
- }
- } catch (Throwable e) {
- LOG.warn("Process one pull load task exception. job id:
{}, task id: {}",
- task.jobId, task.taskId, e);
- task.onFailed(null, new Status(TStatusCode.INTERNAL_ERROR,
e.getMessage()));
- job.onTaskFailed(task);
- }
- }
- }
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java
deleted file mode 100644
index 5a44f22..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java
+++ /dev/null
@@ -1,237 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.Config;
-import org.apache.doris.common.Status;
-import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.DebugUtil;
-import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.load.BrokerFileGroup;
-import org.apache.doris.qe.Coordinator;
-import org.apache.doris.qe.QeProcessorImpl;
-import org.apache.doris.thrift.TBrokerFileStatus;
-import org.apache.doris.thrift.TQueryType;
-import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.thrift.TUniqueId;
-
-import com.google.common.collect.Maps;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-// A pull load task is used to process one table of this pull load job.
-@Deprecated
-public class PullLoadTask {
- private static final Logger LOG = LogManager.getLogger(PullLoadTask.class);
- // Input parameter
- public final long jobId;
- public final int taskId;
- public final Database db;
- public final OlapTable table;
- public final BrokerDesc brokerDesc;
- public final List<BrokerFileGroup> fileGroups;
- public final long jobDeadlineMs;
-
- private PullLoadTaskPlanner planner;
-
- // Useful things after executed
- private Map<String, Long> fileMap;
- private String trackingUrl;
- private Map<String, String> counters;
- private final long execMemLimit;
-
- // Runtime variables
- private enum State {
- RUNNING,
- FINISHED,
- FAILED,
- CANCELLED,
- }
-
- private TUniqueId queryId;
- private Coordinator curCoordinator;
- private State executeState = State.RUNNING;
- private Status executeStatus;
- private Thread curThread;
-
- public PullLoadTask(
- long jobId, int taskId,
- Database db, OlapTable table,
- BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
- long jobDeadlineMs, long execMemLimit) {
- this.jobId = jobId;
- this.taskId = taskId;
- this.db = db;
- this.table = table;
- this.brokerDesc = brokerDesc;
- this.fileGroups = fileGroups;
- this.jobDeadlineMs = jobDeadlineMs;
- this.execMemLimit = execMemLimit;
- }
-
- public void init(List<List<TBrokerFileStatus>> fileStatusList, int
fileNum) throws UserException {
- planner = new PullLoadTaskPlanner(this);
- planner.plan(fileStatusList, fileNum);
- }
-
- public Map<String, Long> getFileMap() {
- return fileMap;
- }
-
- public String getTrackingUrl() {
- return trackingUrl;
- }
-
- public Map<String, String> getCounters() {
- return counters;
- }
-
- private long getLeftTimeMs() {
- if (jobDeadlineMs <= 0) {
- return Config.broker_load_default_timeout_second * 1000;
- }
- return jobDeadlineMs - System.currentTimeMillis();
- }
-
- public synchronized void cancel() {
- if (curCoordinator != null) {
- curCoordinator.cancel();
- }
- }
-
- public synchronized boolean isFinished() {
- return executeState == State.FINISHED;
- }
-
- public Status getExecuteStatus() {
- return executeStatus;
- }
-
- public synchronized void onCancelled(String reason) {
- if (executeState == State.RUNNING) {
- executeState = State.CANCELLED;
- executeStatus = Status.CANCELLED;
- LOG.info("cancel one pull load task({}). task id: {}, query id:
{}, job id: {}",
- reason, taskId,
DebugUtil.printId(curCoordinator.getQueryId()), jobId);
- }
- }
-
- public synchronized void onFinished(Map<String, Long> fileMap,
- Map<String, String> counters,
- String trackingUrl) {
- if (executeState == State.RUNNING) {
- executeState = State.FINISHED;
-
- executeStatus = Status.OK;
- this.fileMap = fileMap;
- this.counters = counters;
- this.trackingUrl = trackingUrl;
- LOG.info("finished one pull load task. task id: {}, query id: {},
job id: {}",
- taskId, DebugUtil.printId(curCoordinator.getQueryId()),
jobId);
- }
- }
-
- public synchronized void onFailed(TUniqueId id, Status failStatus) {
- if (executeState == State.RUNNING) {
- if (id != null && !queryId.equals(id)) {
- return;
- }
- executeState = State.FAILED;
- executeStatus = failStatus;
- LOG.info("failed one pull load task({}). task id: {}, query id:
{}, job id: {}",
- failStatus.getErrorMsg(), taskId, id != null ?
DebugUtil.printId(id) : "NaN", jobId);
- }
- }
-
- private void actualExecute() {
- int waitSecond = (int) (getLeftTimeMs() / 1000);
- if (waitSecond <= 0) {
- onCancelled("waiting timeout");
- return;
- }
-
- // TODO(zc): to refine coordinator
- try {
- curCoordinator.exec();
- } catch (Exception e) {
- LOG.warn("pull load task exec failed", e);
- onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR,
"Coordinator execute failed: " + e.getMessage()));
- return;
- }
-
- if (curCoordinator.join(waitSecond)) {
- Status status = curCoordinator.getExecStatus();
- if (status.ok()) {
- Map<String, Long> resultFileMap = Maps.newHashMap();
- for (String file : curCoordinator.getDeltaUrls()) {
- resultFileMap.put(file, -1L);
- }
- onFinished(resultFileMap, curCoordinator.getLoadCounters(),
curCoordinator.getTrackingUrl());
- } else {
- onFailed(queryId, status);
- }
- } else {
- onCancelled("execution timeout");
- }
- }
-
- public void executeOnce() throws UserException {
- synchronized (this) {
- if (curThread != null) {
- throw new UserException("Task already executing.");
- }
- curThread = Thread.currentThread();
- executeState = State.RUNNING;
- executeStatus = Status.OK;
-
- // New one query id,
- UUID uuid = UUID.randomUUID();
- queryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
- curCoordinator = new Coordinator(jobId, queryId,
planner.getDescTable(),
- planner.getFragments(), planner.getScanNodes(),
db.getClusterName(), TimeUtils.DEFAULT_TIME_ZONE);
- curCoordinator.setQueryType(TQueryType.LOAD);
- curCoordinator.setExecMemoryLimit(execMemLimit);
- curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
- }
-
- boolean needUnregister = false;
- try {
- QeProcessorImpl.INSTANCE.registerQuery(queryId, curCoordinator);
- actualExecute();
- needUnregister = true;
- } catch (UserException e) {
- onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR,
e.getMessage()));
- } finally {
- if (needUnregister) {
- QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
- }
- synchronized (this) {
- curThread = null;
- curCoordinator = null;
- }
- }
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java
deleted file mode 100644
index 251ee2d..0000000
--- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java
+++ /dev/null
@@ -1,160 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.task;
-
-import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.DescriptorTable;
-import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.SlotDescriptor;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Catalog;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.NotImplementedException;
-import org.apache.doris.common.UserException;
-import org.apache.doris.planner.BrokerScanNode;
-import org.apache.doris.planner.DataPartition;
-import org.apache.doris.planner.DataSplitSink;
-import org.apache.doris.planner.ExchangeNode;
-import org.apache.doris.planner.OlapRewriteNode;
-import org.apache.doris.planner.PlanFragment;
-import org.apache.doris.planner.PlanFragmentId;
-import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.planner.ScanNode;
-import org.apache.doris.thrift.TBrokerFileStatus;
-
-import com.google.common.collect.Lists;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.util.Collections;
-import java.util.List;
-
-// Planner used to generate a plan for pull load ETL work
-@Deprecated
-public class PullLoadTaskPlanner {
- private static final Logger LOG =
LogManager.getLogger(PullLoadTaskPlanner.class);
-
- // Input param
- private final PullLoadTask task;
-
- // Something useful
- private final Analyzer analyzer;
- private final DescriptorTable descTable;
-
- // Output param
- private final List<PlanFragment> fragments;
- private final List<ScanNode> scanNodes;
-
- private int nextNodeId = 0;
-
- public PullLoadTaskPlanner(PullLoadTask task) {
- this.task = task;
- this.analyzer = new Analyzer(Catalog.getCurrentCatalog(), null);
- this.descTable = analyzer.getDescTbl();
- this.fragments = Lists.newArrayList();
- this.scanNodes = Lists.newArrayList();
- }
-
- // NOTE: DB lock need hold when call this function.
- public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int
filesAdded) throws UserException {
- // Tuple descriptor used for all nodes in plan.
- OlapTable table = task.table;
-
- // Generate tuple descriptor
- List<Expr> slotRefs = Lists.newArrayList();
- TupleDescriptor tupleDesc = descTable.createTupleDescriptor();
- for (Column col : table.getFullSchema()) {
- SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
- slotDesc.setIsMaterialized(true);
- slotDesc.setColumn(col);
- if (col.isAllowNull()) {
- slotDesc.setIsNullable(true);
- } else {
- slotDesc.setIsNullable(false);
- }
- slotRefs.add(new SlotRef(slotDesc));
- }
-
- // Generate plan tree
- // 1. first Scan node
- BrokerScanNode scanNode = new BrokerScanNode(new
PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
- fileStatusesList,
filesAdded);
- scanNode.setLoadInfo(table, task.brokerDesc, task.fileGroups);
- scanNode.init(analyzer);
- scanNodes.add(scanNode);
-
- // equal node
- OlapRewriteNode rewriteNode = new OlapRewriteNode(
- new PlanNodeId(nextNodeId++), scanNode, table, tupleDesc,
slotRefs);
- rewriteNode.init(analyzer);
-
- descTable.computeMemLayout();
- rewriteNode.finalize(analyzer);
-
- PlanFragment scanFragment = new PlanFragment(new PlanFragmentId(0),
rewriteNode, DataPartition.RANDOM);
- scanNode.setFragmentId(scanFragment.getFragmentId());
- scanNode.setFragment(scanFragment);
- fragments.add(scanFragment);
-
- // exchange node
- ExchangeNode exchangeNode = new ExchangeNode(new
PlanNodeId(nextNodeId++), rewriteNode, false);
- exchangeNode.init(analyzer);
-
- // Create data sink
- DataSplitSink splitSink = null;
- try {
- splitSink = new DataSplitSink(table, tupleDesc);
- } catch (AnalysisException e) {
- LOG.info("New DataSplitSink failed.{}", e);
- throw new UserException(e.getMessage());
- }
- PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(1),
exchangeNode, splitSink.getOutputPartition());
- scanFragment.setDestination(exchangeNode);
- scanFragment.setOutputPartition(splitSink.getOutputPartition());
- sinkFragment.setSink(splitSink);
-
- fragments.add(sinkFragment);
-
- // Get partition
- for (PlanFragment fragment : fragments) {
- try {
- fragment.finalize(analyzer, false);
- } catch (NotImplementedException e) {
- LOG.info("Fragment finalize failed.{}", e);
- throw new UserException("Fragment finalize failed.");
- }
- }
- Collections.reverse(fragments);
- }
-
- public DescriptorTable getDescTable() {
- return descTable;
- }
-
- public List<PlanFragment> getFragments() {
- return fragments;
- }
-
- public List<ScanNode> getScanNodes() {
- return scanNodes;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]