This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 70fc3eb0801cacaefc837ba6983f451149b45cec
Author: Calvin Kirs <[email protected]>
AuthorDate: Thu Jul 13 10:26:46 2023 +0800

    [enhancement](RoutineLoad)Mutile table support pipeline load (#21678)
---
 .../apache/doris/planner/StreamLoadPlanner.java    | 12 ++++--
 .../apache/doris/service/FrontendServiceImpl.java  | 44 +++++++++++++++++-----
 gensrc/thrift/FrontendService.thrift               |  1 +
 gensrc/thrift/PaloInternalService.thrift           |  1 +
 4 files changed, 46 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 2ae793e389..b9014206dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -307,12 +307,18 @@ public class StreamLoadPlanner {
         queryGlobals.setNanoSeconds(LocalDateTime.now().getNano());
 
         params.setQueryGlobals(queryGlobals);
+        params.setTableName(destTable.getName());
 
         // LOG.debug("stream load txn id: {}, plan: {}", 
streamLoadTask.getTxnId(), params);
         return params;
     }
 
+    // single table plan fragmentInstanceIndex is 1(default value)
     public TPipelineFragmentParams planForPipeline(TUniqueId loadId) throws 
UserException {
+        return this.planForPipeline(loadId, 1);
+    }
+
+    public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int 
fragmentInstanceIdIndex) throws UserException {
         if (destTable.getKeysType() != KeysType.UNIQUE_KEYS
                 && taskInfo.getMergeType() != LoadTask.MergeType.APPEND) {
             throw new AnalysisException("load by MERGE or DELETE is only 
supported in unique tables.");
@@ -461,6 +467,7 @@ public class StreamLoadPlanner {
         TPipelineFragmentParams pipParams = new TPipelineFragmentParams();
         pipParams.setProtocolVersion(PaloInternalServiceVersion.V1);
         pipParams.setFragment(fragment.toThrift());
+        pipParams.setFragmentId(fragmentInstanceIdIndex);
 
         pipParams.setDescTbl(analyzer.getDescTbl().toThrift());
         pipParams.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
@@ -470,7 +477,7 @@ public class StreamLoadPlanner {
         pipParams.setNumSenders(1);
 
         TPipelineInstanceParams localParams = new TPipelineInstanceParams();
-        localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 
1));
+        localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo + 
fragmentInstanceIdIndex));
 
         Map<Integer, List<TScanRangeParams>> perNodeScanRange = 
Maps.newHashMap();
         List<TScanRangeParams> scanRangeParams = Lists.newArrayList();
@@ -504,8 +511,7 @@ public class StreamLoadPlanner {
         queryGlobals.setNanoSeconds(LocalDateTime.now().getNano());
 
         pipParams.setQueryGlobals(queryGlobals);
-
-        // LOG.debug("stream load txn id: {}, plan: {}", 
streamLoadTask.getTxnId(), params);
+        pipParams.setTableName(destTable.getName());
         return pipParams;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 41d4a5e56e..206bd56d10 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1741,16 +1741,23 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             return result;
         }
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? 
request.getThriftRpcTimeoutMs() : 5000;
-        List<TExecPlanFragmentParams> planFragmentParamsList = new 
ArrayList<>(tableNames.size());
+        List planFragmentParamsList = new ArrayList<>(tableNames.size());
         List<Long> tableIds = 
olapTables.stream().map(OlapTable::getId).collect(Collectors.toList());
         // todo: if is multi table, we need consider the lock time and the 
timeout
+        boolean enablePipelineLoad = Config.enable_pipeline_load;
         try {
             
multiTableFragmentInstanceIdIndexMap.putIfAbsent(request.getTxnId(), 1);
             for (OlapTable table : olapTables) {
                 int index = 
multiTableFragmentInstanceIdIndexMap.get(request.getTxnId());
-                TExecPlanFragmentParams planFragmentParams = 
generatePlanFragmentParams(request, db, fullDbName,
-                        table, timeoutMs, index);
-                planFragmentParamsList.add(planFragmentParams);
+                if (enablePipelineLoad) {
+                    
planFragmentParamsList.add(generatePipelineStreamLoadPut(request, db, 
fullDbName, table, timeoutMs,
+                            index));
+                } else {
+                    TExecPlanFragmentParams planFragmentParams = 
generatePlanFragmentParams(request, db, fullDbName,
+                            table, timeoutMs, index);
+
+                    planFragmentParamsList.add(planFragmentParams);
+                }
                 multiTableFragmentInstanceIdIndexMap.put(request.getTxnId(), 
++index);
             }
             
Env.getCurrentGlobalTransactionMgr().getDatabaseTransactionMgr(db.getId())
@@ -1763,6 +1770,10 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + 
Strings.nullToEmpty(e.getMessage()));
             return result;
         }
+        if (enablePipelineLoad) {
+            result.setPipelineParams(planFragmentParamsList);
+            return result;
+        }
         result.setParams(planFragmentParamsList);
         return result;
     }
@@ -1791,7 +1802,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
     private TExecPlanFragmentParams 
generatePlanFragmentParams(TStreamLoadPutRequest request, Database db,
                                                                String 
fullDbName, OlapTable table,
                                                                long timeoutMs) 
throws UserException {
-        return generatePlanFragmentParams(request, db, fullDbName, table, 
timeoutMs, 0);
+        return generatePlanFragmentParams(request, db, fullDbName, table, 
timeoutMs, 1);
     }
 
     private TExecPlanFragmentParams 
generatePlanFragmentParams(TStreamLoadPutRequest request, Database db,
@@ -1825,7 +1836,6 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         if (Strings.isNullOrEmpty(cluster)) {
             cluster = SystemInfoService.DEFAULT_CLUSTER;
         }
-
         Env env = Env.getCurrentEnv();
         String fullDbName = ClusterNamespace.getFullName(cluster, 
request.getDb());
         Database db = env.getInternalCatalog().getDbNullable(fullDbName);
@@ -1838,21 +1848,37 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         }
         long timeoutMs = request.isSetThriftRpcTimeoutMs() ? 
request.getThriftRpcTimeoutMs() : 5000;
         Table table = db.getTableOrMetaException(request.getTbl(), 
TableType.OLAP);
+        return this.generatePipelineStreamLoadPut(request, db, fullDbName, 
(OlapTable) table, timeoutMs, 1);
+    }
+
+    private TPipelineFragmentParams 
generatePipelineStreamLoadPut(TStreamLoadPutRequest request, Database db,
+                                                                  String 
fullDbName, OlapTable table,
+                                                                  long 
timeoutMs,
+                                                               int 
multiTableFragmentInstanceIdIndex)
+            throws UserException {
+        if (db == null) {
+            String dbName = fullDbName;
+            if (Strings.isNullOrEmpty(request.getCluster())) {
+                dbName = request.getDb();
+            }
+            throw new UserException("unknown database, database=" + dbName);
+        }
         if (!table.tryReadLock(timeoutMs, TimeUnit.MILLISECONDS)) {
             throw new UserException(
                     "get table read lock timeout, database=" + fullDbName + 
",table=" + table.getName());
         }
         try {
             StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(request);
-            StreamLoadPlanner planner = new StreamLoadPlanner(db, (OlapTable) 
table, streamLoadTask);
-            TPipelineFragmentParams plan = 
planner.planForPipeline(streamLoadTask.getId());
+            StreamLoadPlanner planner = new StreamLoadPlanner(db, table, 
streamLoadTask);
+            TPipelineFragmentParams plan = 
planner.planForPipeline(streamLoadTask.getId(),
+                    multiTableFragmentInstanceIdIndex);
             // add table indexes to transaction state
             TransactionState txnState = Env.getCurrentGlobalTransactionMgr()
                     .getTransactionState(db.getId(), request.getTxnId());
             if (txnState == null) {
                 throw new UserException("txn does not exist: " + 
request.getTxnId());
             }
-            txnState.addTableIndexes((OlapTable) table);
+            txnState.addTableIndexes(table);
             return plan;
         } finally {
             table.readUnlock();
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 0b74c10fba..4630f8bdd0 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -618,6 +618,7 @@ struct TStreamLoadMultiTablePutResult {
     1: required Status.TStatus status
     // valid when status is OK
     2: optional list<PaloInternalService.TExecPlanFragmentParams> params
+    3: optional list<PaloInternalService.TPipelineFragmentParams> 
pipeline_params
 }
 
 struct TKafkaRLTaskProgress {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e556ee46f1..40457481b7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -631,6 +631,7 @@ struct TPipelineFragmentParams {
   24: list<TPipelineInstanceParams> local_params
   26: optional list<TPipelineWorkloadGroup> workload_groups
   27: optional TTxnParams txn_conf
+  28: optional string table_name
 }
 
 struct TPipelineFragmentParamsList {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to