This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 70fc3eb0801cacaefc837ba6983f451149b45cec Author: Calvin Kirs <[email protected]> AuthorDate: Thu Jul 13 10:26:46 2023 +0800 [enhancement](RoutineLoad)Mutile table support pipeline load (#21678) --- .../apache/doris/planner/StreamLoadPlanner.java | 12 ++++-- .../apache/doris/service/FrontendServiceImpl.java | 44 +++++++++++++++++----- gensrc/thrift/FrontendService.thrift | 1 + gensrc/thrift/PaloInternalService.thrift | 1 + 4 files changed, 46 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 2ae793e389..b9014206dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -307,12 +307,18 @@ public class StreamLoadPlanner { queryGlobals.setNanoSeconds(LocalDateTime.now().getNano()); params.setQueryGlobals(queryGlobals); + params.setTableName(destTable.getName()); // LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params); return params; } + // single table plan fragmentInstanceIndex is 1(default value) public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws UserException { + return this.planForPipeline(loadId, 1); + } + + public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentInstanceIdIndex) throws UserException { if (destTable.getKeysType() != KeysType.UNIQUE_KEYS && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) { throw new AnalysisException("load by MERGE or DELETE is only supported in unique tables."); @@ -461,6 +467,7 @@ public class StreamLoadPlanner { TPipelineFragmentParams pipParams = new TPipelineFragmentParams(); pipParams.setProtocolVersion(PaloInternalServiceVersion.V1); pipParams.setFragment(fragment.toThrift()); + pipParams.setFragmentId(fragmentInstanceIdIndex); pipParams.setDescTbl(analyzer.getDescTbl().toThrift()); pipParams.setCoord(new TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port)); @@ -470,7 +477,7 @@ public class StreamLoadPlanner { pipParams.setNumSenders(1); TPipelineInstanceParams localParams = new TPipelineInstanceParams(); - localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 1)); + localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + fragmentInstanceIdIndex)); Map<Integer, List<TScanRangeParams>> perNodeScanRange = Maps.newHashMap(); List<TScanRangeParams> scanRangeParams = Lists.newArrayList(); @@ -504,8 +511,7 @@ public class StreamLoadPlanner { queryGlobals.setNanoSeconds(LocalDateTime.now().getNano()); pipParams.setQueryGlobals(queryGlobals); - - // LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params); + pipParams.setTableName(destTable.getName()); return pipParams; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 41d4a5e56e..206bd56d10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1741,16 +1741,23 @@ public class FrontendServiceImpl implements FrontendService.Iface { return result; } long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; - List<TExecPlanFragmentParams> planFragmentParamsList = new ArrayList<>(tableNames.size()); + List planFragmentParamsList = new ArrayList<>(tableNames.size()); List<Long> tableIds = olapTables.stream().map(OlapTable::getId).collect(Collectors.toList()); // todo: if is multi table, we need consider the lock time and the timeout + boolean enablePipelineLoad = Config.enable_pipeline_load; try { multiTableFragmentInstanceIdIndexMap.putIfAbsent(request.getTxnId(), 1); for (OlapTable table : olapTables) { int index = multiTableFragmentInstanceIdIndexMap.get(request.getTxnId()); - TExecPlanFragmentParams planFragmentParams = generatePlanFragmentParams(request, db, fullDbName, - table, timeoutMs, index); - planFragmentParamsList.add(planFragmentParams); + if (enablePipelineLoad) { + planFragmentParamsList.add(generatePipelineStreamLoadPut(request, db, fullDbName, table, timeoutMs, + index)); + } else { + TExecPlanFragmentParams planFragmentParams = generatePlanFragmentParams(request, db, fullDbName, + table, timeoutMs, index); + + planFragmentParamsList.add(planFragmentParams); + } multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), ++index); } Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId()) @@ -1763,6 +1770,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage())); return result; } + if (enablePipelineLoad) { + result.setPipelineParams(planFragmentParamsList); + return result; + } result.setParams(planFragmentParamsList); return result; } @@ -1791,7 +1802,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest request, Database db, String fullDbName, OlapTable table, long timeoutMs) throws UserException { - return generatePlanFragmentParams(request, db, fullDbName, table, timeoutMs, 0); + return generatePlanFragmentParams(request, db, fullDbName, table, timeoutMs, 1); } private TExecPlanFragmentParams generatePlanFragmentParams(TStreamLoadPutRequest request, Database db, @@ -1825,7 +1836,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (Strings.isNullOrEmpty(cluster)) { cluster = SystemInfoService.DEFAULT_CLUSTER; } - Env env = Env.getCurrentEnv(); String fullDbName = ClusterNamespace.getFullName(cluster, request.getDb()); Database db = env.getInternalCatalog().getDbNullable(fullDbName); @@ -1838,21 +1848,37 @@ public class FrontendServiceImpl implements FrontendService.Iface { } long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP); + return this.generatePipelineStreamLoadPut(request, db, fullDbName, (OlapTable) table, timeoutMs, 1); + } + + private TPipelineFragmentParams generatePipelineStreamLoadPut(TStreamLoadPutRequest request, Database db, + String fullDbName, OlapTable table, + long timeoutMs, + int multiTableFragmentInstanceIdIndex) + throws UserException { + if (db == null) { + String dbName = fullDbName; + if (Strings.isNullOrEmpty(request.getCluster())) { + dbName = request.getDb(); + } + throw new UserException("unknown database, database=" + dbName); + } if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) { throw new UserException( "get table read lock timeout, database=" + fullDbName + ",table=" + table.getName()); } try { StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request); - StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) table, streamLoadTask); - TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId()); + StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask); + TPipelineFragmentParams plan = planner.planForPipeline(streamLoadTask.getId(), + multiTableFragmentInstanceIdIndex); // add table indexes to transaction state TransactionState txnState = Env.getCurrentGlobalTransactionMgr() .getTransactionState(db.getId(), request.getTxnId()); if (txnState == null) { throw new UserException("txn does not exist: " + request.getTxnId()); } - txnState.addTableIndexes((OlapTable) table); + txnState.addTableIndexes(table); return plan; } finally { table.readUnlock(); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 0b74c10fba..4630f8bdd0 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -618,6 +618,7 @@ struct TStreamLoadMultiTablePutResult { 1: required Status.TStatus status // valid when status is OK 2: optional list<PaloInternalService.TExecPlanFragmentParams> params + 3: optional list<PaloInternalService.TPipelineFragmentParams> pipeline_params } struct TKafkaRLTaskProgress { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index e556ee46f1..40457481b7 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -631,6 +631,7 @@ struct TPipelineFragmentParams { 24: list<TPipelineInstanceParams> local_params 26: optional list<TPipelineWorkloadGroup> workload_groups 27: optional TTxnParams txn_conf + 28: optional string table_name } struct TPipelineFragmentParamsList { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
