This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 4cc2081ba4 [fix](exec) run exec_plan_fragment in pthread to avoid BE 
crash (#21343)
4cc2081ba4 is described below

commit 4cc2081ba40020045fe47604c481a8aee7fef2ed
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Jul 1 12:29:22 2023 +0800

    [fix](exec) run exec_plan_fragment in pthread to avoid BE crash (#21343)
    
    If there is only one fragment of a query plan, FE will call 
`exec_plan_fragment` rpc to BE.
    And on BE side, the `exec_plan_fragment()` will be executed directly in 
bthread, but it may call
    some JNI method like `AttachCurrentThread()`, which will return error in 
bthread.
    
    So I modify the `exec_plan_fragment` to make sure it will be executed in 
pthread pool.
---
 be/src/service/internal_service.cpp                | 23 +++++++++--
 be/src/service/internal_service.h                  |  9 +++-
 regression-test/data/export_p2/test_outfile_p2.out |  4 ++
 .../suites/export_p2/test_outfile_p2.groovy        | 48 ++++++++++++++++++++++
 4 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 6db5a55668..075a47847e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -228,6 +228,20 @@ void 
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
                                               const PExecPlanFragmentRequest* 
request,
                                               PExecPlanFragmentResult* 
response,
                                               google::protobuf::Closure* done) 
{
+    bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
+        _exec_plan_fragment_in_pthread(controller, request, response, done);
+    });
+    if (!ret) {
+        LOG(WARNING) << "fail to offer request to the work pool";
+        brpc::ClosureGuard closure_guard(done);
+        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    }
+}
+
+void PInternalServiceImpl::_exec_plan_fragment_in_pthread(
+        google::protobuf::RpcController* controller, const 
PExecPlanFragmentRequest* request,
+        PExecPlanFragmentResult* response, google::protobuf::Closure* done) {
     auto span = telemetry::start_rpc_server_span("exec_plan_fragment", 
controller);
     auto scope = OpentelemetryScope {span};
     brpc::ClosureGuard closure_guard(done);
@@ -235,7 +249,7 @@ void 
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
     bool compact = request->has_compact() ? request->compact() : false;
     PFragmentRequestVersion version =
             request->has_version() ? request->version() : 
PFragmentRequestVersion::VERSION_1;
-    st = _exec_plan_fragment(request->request(), version, compact);
+    st = _exec_plan_fragment_impl(request->request(), version, compact);
     if (!st.ok()) {
         LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
     }
@@ -247,7 +261,7 @@ void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcContr
                                                       PExecPlanFragmentResult* 
response,
                                                       
google::protobuf::Closure* done) {
     bool ret = _light_work_pool.try_offer([this, controller, request, 
response, done]() {
-        exec_plan_fragment(controller, request, response, done);
+        _exec_plan_fragment_in_pthread(controller, request, response, done);
     });
     if (!ret) {
         LOG(WARNING) << "fail to offer request to the work pool";
@@ -438,8 +452,9 @@ void 
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
     }
 }
 
-Status PInternalServiceImpl::_exec_plan_fragment(const std::string& 
ser_request,
-                                                 PFragmentRequestVersion 
version, bool compact) {
+Status PInternalServiceImpl::_exec_plan_fragment_impl(const std::string& 
ser_request,
+                                                      PFragmentRequestVersion 
version,
+                                                      bool compact) {
     if (version == PFragmentRequestVersion::VERSION_1) {
         // VERSION_1 should be removed in v1.2
         TExecPlanFragmentParams t_request;
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index e5855d98f3..360b6ad7b0 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -163,8 +163,13 @@ public:
                                            google::protobuf::Closure* done) 
override;
 
 private:
-    Status _exec_plan_fragment(const std::string& s_request, 
PFragmentRequestVersion version,
-                               bool compact);
+    void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* 
controller,
+                                        const PExecPlanFragmentRequest* 
request,
+                                        PExecPlanFragmentResult* result,
+                                        google::protobuf::Closure* done);
+
+    Status _exec_plan_fragment_impl(const std::string& s_request, 
PFragmentRequestVersion version,
+                                    bool compact);
 
     Status _fold_constant_expr(const std::string& ser_request, 
PConstantExprResult* response);
 
diff --git a/regression-test/data/export_p2/test_outfile_p2.out 
b/regression-test/data/export_p2/test_outfile_p2.out
new file mode 100644
index 0000000000..ccb2d43e0e
--- /dev/null
+++ b/regression-test/data/export_p2/test_outfile_p2.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql_1 --
+1      abc
+
diff --git a/regression-test/suites/export_p2/test_outfile_p2.groovy 
b/regression-test/suites/export_p2/test_outfile_p2.groovy
new file mode 100644
index 0000000000..429b7a88f7
--- /dev/null
+++ b/regression-test/suites/export_p2/test_outfile_p2.groovy
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_outfile_hdfs", "p2") {
+    String nameNodeHost = context.config.otherConfigs.get("extHiveHmsHost")
+    String hdfsPort = context.config.otherConfigs.get("extHdfsPort")
+    String fs = "hdfs://${nameNodeHost}:${hdfsPort}"
+    String user_name = "hadoop"
+
+    def table_outfile_name = "test_outfile_hdfs"
+    // create table and insert
+    sql """ DROP TABLE IF EXISTS ${table_outfile_name} """
+    sql """
+    CREATE TABLE IF NOT EXISTS ${table_outfile_name} (
+        `id` int(11) NULL,
+        `name` string NULL
+        )
+        DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
+    """
+
+    sql """insert into ${table_outfile_name} values(1, 'abc');"""
+
+    qt_sql_1 """select * from ${table_outfile_name} order by id"""
+
+    // use a simple sql to make sure there is only one fragment
+    // #21343
+    sql """select * from ${table_outfile_name} INTO OUTFILE 
'${fs}/user/outfile_test/'
+        FORMAT AS PARQUET PROPERTIES
+        (
+            'hadoop.username' = '${user_name}',
+            'fs.defaultFS'='${fs}'
+        );
+    """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to