This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b9b028099d [enhancement](stream load pipe) using queryid or load id to 
identify stream load pipe instead of fragment instance id (#17362)
b9b028099d is described below

commit b9b028099d00ec9d7c12515af66acf01192748e4
Author: yiguolei <[email protected]>
AuthorDate: Sat Mar 4 16:19:36 2023 +0800

    [enhancement](stream load pipe) using queryid or load id to identify stream 
load pipe instead of fragment instance id (#17362)
    
    * [enhancement](stream load pipe) using queryid or load id to identify 
stream load pipe instead of fragment instance id
    
    NewLoadStreamMgr already has pipe and other info. Do not need save the pipe 
into fragment state. and FragmentState should be more clear.
    
    But this pr will change the behaviour of BE.
    I will pick the pr to doris 1.2.3 and add the load id to FE support. The 
user could upgrade from 1.2.3 to 2.x
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/pipeline/pipeline_fragment_context.cpp      |  9 +++-
 be/src/pipeline/pipeline_fragment_context.h        |  5 --
 be/src/runtime/fragment_mgr.cpp                    | 51 +++---------------
 be/src/runtime/fragment_mgr.h                      |  5 --
 be/src/runtime/query_fragments_ctx.h               |  2 +
 be/src/service/internal_service.cpp                | 61 +++++++++++-----------
 be/test/runtime/fragment_mgr_test.cpp              |  2 +-
 .../apache/doris/planner/StreamLoadPlanner.java    |  1 +
 .../apache/doris/qe/InsertStreamTxnExecutor.java   | 16 ++++--
 .../java/org/apache/doris/qe/StmtExecutor.java     |  8 ++-
 .../org/apache/doris/rpc/BackendServiceProxy.java  | 14 +++--
 .../apache/doris/transaction/TransactionEntry.java | 10 ++++
 .../doris/load/sync/canal/CanalSyncDataTest.java   | 13 ++---
 gensrc/proto/internal_service.proto                |  3 ++
 14 files changed, 94 insertions(+), 106 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index df51da373a..727383d1a5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -64,6 +64,8 @@
 #include "runtime/client_cache.h"
 #include "runtime/fragment_mgr.h"
 #include "runtime/runtime_state.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
 #include "task_scheduler.h"
 #include "util/container_util.hpp"
 #include "vec/exec/join/vhash_join_node.h"
@@ -123,8 +125,11 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
             _exec_status = Status::Cancelled(msg);
         }
         _runtime_state->set_is_cancelled(true);
-        if (_pipe != nullptr) {
-            _pipe->cancel(PPlanFragmentCancelReason_Name(reason));
+        // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
+        // For stream load the fragment's query_id == load id, it is set in FE.
+        auto stream_load_ctx = 
_exec_env->new_load_stream_mgr()->get(_query_id);
+        if (stream_load_ctx != nullptr) {
+            
stream_load_ctx->pipe->cancel(PPlanFragmentCancelReason_Name(reason));
         }
         _cancel_reason = reason;
         _cancel_msg = msg;
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 0a756ed37f..5bd25e256d 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -100,9 +100,6 @@ public:
 
     void send_report(bool);
 
-    void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
-    std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
-
     void report_profile();
 
     Status update_status(Status status) {
@@ -171,8 +168,6 @@ private:
     RuntimeProfile::Counter* _start_timer;
     RuntimeProfile::Counter* _prepare_timer;
 
-    std::shared_ptr<io::StreamLoadPipe> _pipe;
-
     std::function<void(RuntimeState*, Status*)> _call_back;
     std::once_flag _close_once_flag;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 0b15d0a518..7599a707c4 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -134,9 +134,6 @@ public:
 
     std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return 
_fragments_ctx; }
 
-    void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
-    std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
-
     void set_need_wait_execution_trigger() { _need_wait_execution_trigger = 
true; }
 
 private:
@@ -167,8 +164,6 @@ private:
     std::shared_ptr<QueryFragmentsCtx> _fragments_ctx;
 
     std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
-    // The pipe for data transfering, such as insert.
-    std::shared_ptr<io::StreamLoadPipe> _pipe;
 
     // If set the true, this plan fragment will be executed only after FE send 
execution start rpc.
     bool _need_wait_execution_trigger = false;
@@ -251,9 +246,14 @@ Status FragmentExecState::cancel(const 
PPlanFragmentCancelReason& reason, const
             _executor.set_is_report_on_cancel(false);
         }
         _executor.cancel(reason, msg);
-        if (_pipe != nullptr) {
-            _pipe->cancel(PPlanFragmentCancelReason_Name(reason));
+#ifndef BE_TEST
+        // Get pipe from new load stream manager and send cancel to it or the 
fragment may hang to wait read from pipe
+        // For stream load the fragment's query_id == load id, it is set in FE.
+        auto stream_load_ctx = 
_fragments_ctx->exec_env()->new_load_stream_mgr()->get(_query_id);
+        if (stream_load_ctx != nullptr) {
+            
stream_load_ctx->pipe->cancel(PPlanFragmentCancelReason_Name(reason));
         }
+#endif
         _cancelled = true;
     }
     return Status::OK();
@@ -546,9 +546,6 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params) {
                 _exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, 
stream_load_ctx));
 
         
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx));
-        set_pipe(params.params.fragment_instance_id, pipe,
-                 params.txn_conf.__isset.enable_pipeline_txn_load &&
-                         params.txn_conf.enable_pipeline_txn_load);
         return Status::OK();
     } else {
         return exec_plan_fragment(params, empty_function);
@@ -576,40 +573,6 @@ Status FragmentMgr::start_query_execution(const 
PExecPlanFragmentStartRequest* r
     return Status::OK();
 }
 
-void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id,
-                           std::shared_ptr<io::StreamLoadPipe> pipe, bool 
enable_pipeline_engine) {
-    if (enable_pipeline_engine) {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto iter = _pipeline_map.find(fragment_instance_id);
-        if (iter != _pipeline_map.end()) {
-            _pipeline_map[fragment_instance_id]->set_pipe(std::move(pipe));
-        }
-    } else {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto iter = _fragment_map.find(fragment_instance_id);
-        if (iter != _fragment_map.end()) {
-            _fragment_map[fragment_instance_id]->set_pipe(std::move(pipe));
-        }
-    }
-}
-
-std::shared_ptr<io::StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& 
fragment_instance_id) {
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        auto pipeline_iter = _pipeline_map.find(fragment_instance_id);
-        if (pipeline_iter != _pipeline_map.end()) {
-            return _pipeline_map[fragment_instance_id]->get_pipe();
-        } else {
-            auto fragment_iter = _fragment_map.find(fragment_instance_id);
-            if (fragment_iter != _fragment_map.end()) {
-                return _fragment_map[fragment_instance_id]->get_pipe();
-            } else {
-                return nullptr;
-            }
-        }
-    }
-}
-
 void FragmentMgr::remove_pipeline_context(
         std::shared_ptr<pipeline::PipelineFragmentContext> f_context) {
     std::lock_guard<std::mutex> lock(_lock);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 455e81c8d5..9c066c58f3 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -127,11 +127,6 @@ public:
     Status merge_filter(const PMergeFilterRequest* request,
                         butil::IOBufAsZeroCopyInputStream* attach_data);
 
-    void set_pipe(const TUniqueId& fragment_instance_id, 
std::shared_ptr<io::StreamLoadPipe> pipe,
-                  bool enable_pipeline_engine);
-
-    std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& 
fragment_instance_id);
-
     std::string to_http_path(const std::string& file_name);
 
     void coordinator_callback(const ReportStatusRequest& req);
diff --git a/be/src/runtime/query_fragments_ctx.h 
b/be/src/runtime/query_fragments_ctx.h
index a01c3924c0..389cecb860 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -67,6 +67,8 @@ public:
     // this may be a bug, bug <= 1 in theory it shouldn't cause any problems 
at this stage.
     bool countdown() { return fragment_num.fetch_sub(1) <= 1; }
 
+    ExecEnv* exec_env() { return _exec_env; }
+
     bool is_timeout(const DateTimeValue& now) const {
         if (timeout_second <= 0) {
             return false;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index fdb7c52b7d..db76453d76 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -717,15 +717,16 @@ void 
PInternalServiceImpl::send_data(google::protobuf::RpcController* controller
                                      google::protobuf::Closure* done) {
     bool ret = _heavy_work_pool.try_offer([this, request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
-        TUniqueId fragment_instance_id;
-        fragment_instance_id.hi = request->fragment_instance_id().hi();
-        fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-        if (pipe == nullptr) {
+        TUniqueId load_id;
+        load_id.hi = request->load_id().hi();
+        load_id.lo = request->load_id().lo();
+        // On 1.2.3 we add load id to send data request and using load id to 
get pipe
+        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
+        if (stream_load_ctx == nullptr) {
             response->mutable_status()->set_status_code(1);
-            response->mutable_status()->add_error_msgs("pipe is null");
+            response->mutable_status()->add_error_msgs("could not find stream 
load context");
         } else {
+            auto pipe = stream_load_ctx->pipe;
             for (int i = 0; i < request->data_size(); ++i) {
                 PDataRow* row = new PDataRow();
                 row->CopyFrom(request->data(i));
@@ -748,16 +749,16 @@ void 
PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
                                   google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
-        TUniqueId fragment_instance_id;
-        fragment_instance_id.hi = request->fragment_instance_id().hi();
-        fragment_instance_id.lo = request->fragment_instance_id().lo();
+        TUniqueId load_id;
+        load_id.hi = request->load_id().hi();
+        load_id.lo = request->load_id().lo();
 
-        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-        if (pipe == nullptr) {
+        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
+        if (stream_load_ctx == nullptr) {
             response->mutable_status()->set_status_code(1);
-            response->mutable_status()->add_error_msgs("pipe is null");
+            response->mutable_status()->add_error_msgs("could not find stream 
load context");
         } else {
-            pipe->finish();
+            stream_load_ctx->pipe->finish();
             response->mutable_status()->set_status_code(0);
         }
     });
@@ -774,16 +775,15 @@ void 
PInternalServiceImpl::rollback(google::protobuf::RpcController* controller,
                                     google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, request, response, done]() {
         brpc::ClosureGuard closure_guard(done);
-        TUniqueId fragment_instance_id;
-        fragment_instance_id.hi = request->fragment_instance_id().hi();
-        fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-        if (pipe == nullptr) {
+        TUniqueId load_id;
+        load_id.hi = request->load_id().hi();
+        load_id.lo = request->load_id().lo();
+        auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(load_id);
+        if (stream_load_ctx == nullptr) {
             response->mutable_status()->set_status_code(1);
-            response->mutable_status()->add_error_msgs("pipe is null");
+            response->mutable_status()->add_error_msgs("could not find stream 
load context");
         } else {
-            pipe->cancel("rollback");
+            stream_load_ctx->pipe->cancel("rollback");
             response->mutable_status()->set_status_code(0);
         }
     });
@@ -1083,10 +1083,11 @@ void 
PInternalServiceImpl::request_slave_tablet_pull_rowset(
                 // Check file length
                 uint64_t local_file_size = 
std::filesystem::file_size(local_file_path);
                 if (local_file_size != file_size) {
-                    LOG(WARNING)
-                            << "failed to pull rowset for slave replica. 
download file length error"
-                            << ", remote_path=" << remote_file_url << ", 
file_size=" << file_size
-                            << ", local_file_size=" << local_file_size;
+                    LOG(WARNING) << "failed to pull rowset for slave replica. 
download file "
+                                    "length error"
+                                 << ", remote_path=" << remote_file_url
+                                 << ", file_size=" << file_size
+                                 << ", local_file_size=" << local_file_size;
                     return Status::InternalError("downloaded file size is not 
equal");
                 }
                 chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR);
@@ -1094,10 +1095,10 @@ void 
PInternalServiceImpl::request_slave_tablet_pull_rowset(
             };
             auto st = HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 
1, download_cb);
             if (!st.ok()) {
-                LOG(WARNING)
-                        << "failed to pull rowset for slave replica. failed to 
download file. url="
-                        << remote_file_url << ", local_path=" << 
local_file_path
-                        << ", txn_id=" << rowset_meta->txn_id();
+                LOG(WARNING) << "failed to pull rowset for slave replica. 
failed to download "
+                                "file. url="
+                             << remote_file_url << ", local_path=" << 
local_file_path
+                             << ", txn_id=" << rowset_meta->txn_id();
                 _response_pull_slave_rowset(host, brpc_port, 
rowset_meta->txn_id(),
                                             rowset_meta->tablet_id(), node_id, 
false);
                 return;
diff --git a/be/test/runtime/fragment_mgr_test.cpp 
b/be/test/runtime/fragment_mgr_test.cpp
index 3dfc8b0a37..9a3967c031 100644
--- a/be/test/runtime/fragment_mgr_test.cpp
+++ b/be/test/runtime/fragment_mgr_test.cpp
@@ -118,7 +118,7 @@ TEST_F(FragmentMgrTest, OfferPoolFailed) {
     config::fragment_pool_thread_num_min = 1;
     config::fragment_pool_thread_num_max = 1;
     config::fragment_pool_queue_size = 0;
-    FragmentMgr mgr(nullptr);
+    FragmentMgr mgr(doris::ExecEnv::GetInstance());
 
     TExecPlanFragmentParams params;
     params.params.fragment_instance_id = TUniqueId();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 9b5e15da75..16c6f38835 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -206,6 +206,7 @@ public class StreamLoadPlanner {
             fileStatus.setIsDir(false);
             fileStatus.setSize(-1); // must set to -1, means stream.
         }
+        // The load id will pass to csv reader to find the stream load context 
from new load stream manager
         fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, 
BrokerDesc.createForStreamLoad(),
                 fileGroup, fileStatus, taskInfo.isStrictMode(), 
taskInfo.getFileType(), taskInfo.getHiddenColumns());
         scanNode = fileScanNode;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
index 5f9e1e7972..324c6d523a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/InsertStreamTxnExecutor.java
@@ -61,9 +61,11 @@ public class InsertStreamTxnExecutor {
     public void beginTransaction(TStreamLoadPutRequest request) throws 
UserException, TException, TimeoutException,
             InterruptedException, ExecutionException {
         TTxnParams txnConf = txnEntry.getTxnConf();
+        // StreamLoadTask's id == request's load_id
         StreamLoadTask streamLoadTask = 
StreamLoadTask.fromTStreamLoadPutRequest(request);
         StreamLoadPlanner planner = new StreamLoadPlanner(
                 txnEntry.getDb(), (OlapTable) txnEntry.getTable(), 
streamLoadTask);
+        // Will using load id as query id in fragment
         TExecPlanFragmentParams tRequest = 
planner.plan(streamLoadTask.getId());
         BeSelectionPolicy policy = new 
BeSelectionPolicy.Builder().setCluster(txnEntry.getDb().getClusterName())
                 .needLoadAvailable().needQueryAvailable().build();
@@ -82,6 +84,10 @@ public class InsertStreamTxnExecutor {
             }
         }
         txnConf.setFragmentInstanceId(tRequest.params.fragment_instance_id);
+        this.loadId = request.getLoadId();
+        this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+                .setHi(loadId.getHi())
+                .setLo(loadId.getLo()).build());
 
         Backend backend = 
Env.getCurrentSystemInfo().getIdToBackend().get(beIds.get(0));
         txnConf.setUserIp(backend.getIp());
@@ -109,11 +115,12 @@ public class InsertStreamTxnExecutor {
                 .setHi(txnConf.getFragmentInstanceId().getHi())
                 .setLo(txnConf.getFragmentInstanceId().getLo()).build();
 
+
         Backend backend = txnEntry.getBackend();
         TNetworkAddress address = new TNetworkAddress(backend.getIp(), 
backend.getBrpcPort());
         try {
             Future<InternalService.PCommitResult> future = BackendServiceProxy
-                    .getInstance().commit(address, fragmentInstanceId);
+                    .getInstance().commit(address, fragmentInstanceId, 
this.txnEntry.getpLoadId());
             InternalService.PCommitResult result = future.get(5, 
TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
@@ -135,7 +142,7 @@ public class InsertStreamTxnExecutor {
         TNetworkAddress address = new TNetworkAddress(be.getIp(), 
be.getBrpcPort());
         try {
             Future<InternalService.PRollbackResult> future = 
BackendServiceProxy.getInstance().rollback(address,
-                    fragmentInstanceId);
+                    fragmentInstanceId, this.txnEntry.getpLoadId());
             InternalService.PRollbackResult result = future.get(5, 
TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
@@ -161,7 +168,7 @@ public class InsertStreamTxnExecutor {
         TNetworkAddress address = new TNetworkAddress(backend.getIp(), 
backend.getBrpcPort());
         try {
             Future<InternalService.PSendDataResult> future = 
BackendServiceProxy.getInstance().sendData(
-                    address, fragmentInstanceId, txnEntry.getDataToSend());
+                    address, fragmentInstanceId, this.txnEntry.getpLoadId(), 
txnEntry.getDataToSend());
             InternalService.PSendDataResult result = future.get(5, 
TimeUnit.SECONDS);
             TStatusCode code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
             if (code != TStatusCode.OK) {
@@ -180,6 +187,9 @@ public class InsertStreamTxnExecutor {
 
     public void setLoadId(TUniqueId loadId) {
         this.loadId = loadId;
+        this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
+                .setHi(loadId.getHi())
+                .setLo(loadId.getLo()).build());
     }
 
     public long getTxnId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 0ba673baa9..73f7bcca55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -623,10 +623,8 @@ public class StmtExecutor implements ProfileWriter {
             context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
e.getMessage());
             throw e;
         } catch (UserException e) {
-            LOG.warn("", e);
             // analysis exception only print message, not print the stack
-            LOG.warn("execute Exception. {}, {}", context.getQueryIdentifier(),
-                                    e.getMessage());
+            LOG.warn("execute Exception. {}", context.getQueryIdentifier(), e);
             context.getState().setError(e.getMysqlErrorCode(), e.getMessage());
             context.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
         } catch (Exception e) {
@@ -1358,8 +1356,7 @@ public class StmtExecutor implements ProfileWriter {
             if (context.getTxnEntry() == null) {
                 context.setTxnEntry(new TransactionEntry());
             }
-            TransactionEntry txnEntry = context.getTxnEntry();
-            txnEntry.setTxnConf(txnParams);
+            context.getTxnEntry().setTxnConf(txnParams);
             StringBuilder sb = new StringBuilder();
             
sb.append("{'label':'").append(context.getTxnEntry().getLabel()).append("', 
'status':'")
                     .append(TransactionStatus.PREPARE.name());
@@ -1405,6 +1402,7 @@ public class StmtExecutor implements ProfileWriter {
                         
.append(context.getTxnEntry().getTxnConf().getTxnId()).append("'").append("}");
                 context.getState().setOk(0, 0, sb.toString());
             } catch (Exception e) {
+                LOG.warn("Txn commit failed", e);
                 throw new AnalysisException(e.getMessage());
             } finally {
                 context.setTxnEntry(null);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 1a09b0721c..3c04380461 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -294,11 +294,13 @@ public class BackendServiceProxy {
     }
 
     public Future<InternalService.PSendDataResult> sendData(
-            TNetworkAddress address, Types.PUniqueId fragmentInstanceId, 
List<InternalService.PDataRow> data)
+            TNetworkAddress address, Types.PUniqueId fragmentInstanceId,
+            Types.PUniqueId loadId, List<InternalService.PDataRow> data)
             throws RpcException {
 
         final InternalService.PSendDataRequest.Builder pRequest = 
InternalService.PSendDataRequest.newBuilder();
         pRequest.setFragmentInstanceId(fragmentInstanceId);
+        pRequest.setLoadId(loadId);
         pRequest.addAllData(data);
         try {
             final BackendServiceClient client = getProxy(address);
@@ -309,10 +311,11 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<InternalService.PRollbackResult> rollback(TNetworkAddress 
address, Types.PUniqueId fragmentInstanceId)
+    public Future<InternalService.PRollbackResult> rollback(TNetworkAddress 
address,
+            Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId)
             throws RpcException {
         final InternalService.PRollbackRequest pRequest = 
InternalService.PRollbackRequest.newBuilder()
-                .setFragmentInstanceId(fragmentInstanceId).build();
+                
.setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build();
         try {
             final BackendServiceClient client = getProxy(address);
             return client.rollback(pRequest);
@@ -322,10 +325,11 @@ public class BackendServiceProxy {
         }
     }
 
-    public Future<InternalService.PCommitResult> commit(TNetworkAddress 
address, Types.PUniqueId fragmentInstanceId)
+    public Future<InternalService.PCommitResult> commit(TNetworkAddress 
address,
+            Types.PUniqueId fragmentInstanceId, Types.PUniqueId loadId)
             throws RpcException {
         final InternalService.PCommitRequest pRequest = 
InternalService.PCommitRequest.newBuilder()
-                .setFragmentInstanceId(fragmentInstanceId).build();
+                
.setFragmentInstanceId(fragmentInstanceId).setLoadId(loadId).build();
         try {
             final BackendServiceClient client = getProxy(address);
             return client.commit(pRequest);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 4db596dc55..7136871579 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -20,6 +20,7 @@ package org.apache.doris.transaction;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.proto.InternalService;
+import org.apache.doris.proto.Types;
 import org.apache.doris.system.Backend;
 import org.apache.doris.thrift.TTxnParams;
 
@@ -35,6 +36,7 @@ public class TransactionEntry {
     private TTxnParams txnConf;
     private List<InternalService.PDataRow> dataToSend = new ArrayList<>();
     private long rowsInTransaction = 0;
+    private Types.PUniqueId pLoadId;
 
     public TransactionEntry() {
     }
@@ -116,4 +118,12 @@ public class TransactionEntry {
     public void setRowsInTransaction(long rowsInTransaction) {
         this.rowsInTransaction = rowsInTransaction;
     }
+
+    public Types.PUniqueId getpLoadId() {
+        return pLoadId;
+    }
+
+    public void setpLoadId(Types.PUniqueId pLoadId) {
+        this.pLoadId = pLoadId;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
index 430d8d204e..a1f31b0b34 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/sync/canal/CanalSyncDataTest.java
@@ -263,11 +263,11 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = execFuture;
 
-                backendServiceProxy.commit((TNetworkAddress) any, 
(Types.PUniqueId) any);
+                backendServiceProxy.commit((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any);
                 minTimes = 0;
                 result = commitFuture;
 
-                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any,
+                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any,
                         (List<InternalService.PDataRow>) any);
                 minTimes = 0;
                 result = sendDataFuture;
@@ -336,7 +336,7 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = execFuture;
 
-                backendServiceProxy.rollback((TNetworkAddress) any, 
(Types.PUniqueId) any);
+                backendServiceProxy.rollback((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any);
                 minTimes = 0;
                 result = abortFuture;
 
@@ -403,15 +403,16 @@ public class CanalSyncDataTest {
                 minTimes = 0;
                 result = execFuture;
 
-                backendServiceProxy.commit((TNetworkAddress) any, 
(Types.PUniqueId) any);
+                backendServiceProxy.commit((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any);
                 minTimes = 0;
                 result = commitFuture;
 
-                backendServiceProxy.rollback((TNetworkAddress) any, 
(Types.PUniqueId) any);
+                backendServiceProxy.rollback((TNetworkAddress) any, 
(Types.PUniqueId) any, (Types.PUniqueId) any);
                 minTimes = 0;
                 result = abortFuture;
 
-                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any, (List<InternalService.PDataRow>) any);
+                backendServiceProxy.sendData((TNetworkAddress) any, 
(Types.PUniqueId) any,
+                        (Types.PUniqueId) any, 
(List<InternalService.PDataRow>) any);
                 minTimes = 0;
                 result = sendDataFuture;
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 05746715c3..3cff8fd733 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -385,6 +385,7 @@ message PDataRow {
 message PSendDataRequest {
     required PUniqueId fragment_instance_id = 1;
     repeated PDataRow data = 2;
+    optional PUniqueId load_id = 3; // load_id == query_id in fragment exec
 }
 
 message PSendDataResult {
@@ -393,6 +394,7 @@ message PSendDataResult {
 
 message PCommitRequest {
     required PUniqueId fragment_instance_id = 1;
+    optional PUniqueId load_id = 2;
 }
 
 message PCommitResult {
@@ -401,6 +403,7 @@ message PCommitResult {
 
 message PRollbackRequest {
     required PUniqueId fragment_instance_id = 1;
+    optional PUniqueId load_id = 2;
 }
 
 message PRollbackResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to