This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dc35ab534 [fix](streamload) set coord for streamLoad (#12744)
9dc35ab534 is described below

commit 9dc35ab5346d3373a6f97e6d514f535abdfd73a5
Author: Yongqiang YANG <[email protected]>
AuthorDate: Fri Sep 23 20:23:19 2022 +0800

    [fix](streamload) set coord for streamLoad (#12744)
    
    When a stream load is canceled, status is reported to coord.
---
 be/src/common/daemon.cpp                                            | 2 ++
 be/src/runtime/fragment_mgr.cpp                                     | 6 +++++-
 .../src/main/java/org/apache/doris/planner/StreamLoadPlanner.java   | 3 +++
 3 files changed, 10 insertions(+), 1 deletion(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 64f1e509fb..0492400e21 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -77,6 +77,8 @@ void Daemon::tcmalloc_gc_thread() {
                                                         &used_size);
         
MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_free_bytes", 
&free_size);
         size_t alloc_size = used_size + free_size;
+        LOG(INFO) << "tcmalloc.pageheap_free_bytes " << free_size
+                  << ", generic.current_allocated_bytes " << used_size;
 
         if (alloc_size > config::tc_use_memory_min) {
             size_t max_free_size = alloc_size * config::tc_free_memory_rate / 
100;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 1b86f57a94..214581f696 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -297,8 +297,9 @@ void FragmentExecState::coordinator_callback(const Status& 
status, RuntimeProfil
     FrontendServiceConnection coord(_exec_env->frontend_client_cache(), 
_coord_addr, &coord_status);
     if (!coord_status.ok()) {
         std::stringstream ss;
+        UniqueId uid(_query_id.hi, _query_id.lo);
         ss << "couldn't get a client for " << _coord_addr << ", reason: " << 
coord_status;
-        LOG(WARNING) << "query_id: " << _query_id << ", " << ss.str();
+        LOG(WARNING) << "query_id: " << uid << ", " << ss.str();
         update_status(Status::InternalError(ss.str()));
         return;
     }
@@ -623,6 +624,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
         RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), 
params.desc_tbl,
                                               &(fragments_ctx->desc_tbl)));
         fragments_ctx->coord_addr = params.coord;
+        LOG(INFO) << "query_id: "
+                  << UniqueId(fragments_ctx->query_id.hi, 
fragments_ctx->query_id.lo)
+                  << " coord_addr " << fragments_ctx->coord_addr;
         fragments_ctx->query_globals = params.query_globals;
 
         if (params.__isset.resource_info) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index aa1e43ac5c..61214af84e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -43,10 +43,12 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.RoutineLoadJob;
+import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.LoadTaskInfo;
 import org.apache.doris.thrift.PaloInternalServiceVersion;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TLoadErrorHubInfo;
+import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanFragmentExecParams;
 import org.apache.doris.thrift.TQueryGlobals;
 import org.apache.doris.thrift.TQueryOptions;
@@ -200,6 +202,7 @@ public class StreamLoadPlanner {
         params.setFragment(fragment.toThrift());
 
         params.setDescTbl(analyzer.getDescTbl().toThrift());
+        params.setCoord(new 
TNetworkAddress(FrontendOptions.getLocalHostAddress(), Config.rpc_port));
 
         TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
         // user load id (streamLoadTask.id) as query id


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to