This is an automated email from the ASF dual-hosted git repository.

caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new aeee738  Revert "[Refactor][agent_task] Remove etl mgr and etl job 
pool from be (#8635)" (#8666)
aeee738 is described below

commit aeee738af02ab9fdb1b84c6595287b2e919f6690
Author: yiguolei <[email protected]>
AuthorDate: Fri Mar 25 18:32:50 2022 +0800

    Revert "[Refactor][agent_task] Remove etl mgr and etl job pool from be 
(#8635)" (#8666)
    
    This reverts commit 6bc982c37436acf288f566cf10e084731b80fa44.
---
 be/src/agent/agent_server.cpp        |  42 +++++
 be/src/agent/agent_server.h          |   6 +
 be/src/runtime/CMakeLists.txt        |   1 +
 be/src/runtime/etl_job_mgr.cpp       | 302 +++++++++++++++++++++++++++++++++++
 be/src/runtime/etl_job_mgr.h         | 102 ++++++++++++
 be/src/runtime/exec_env.h            |   5 +
 be/src/runtime/exec_env_init.cpp     |  12 ++
 be/src/service/backend_service.h     |  17 ++
 be/src/util/doris_metrics.h          |   1 +
 be/test/runtime/CMakeLists.txt       |   1 +
 be/test/runtime/data_stream_test.cpp |   9 ++
 be/test/runtime/etl_job_mgr_test.cpp | 232 +++++++++++++++++++++++++++
 gensrc/thrift/BackendService.thrift  |   7 +
 13 files changed, 737 insertions(+)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index f2d0bdd..a3205f1 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -27,6 +27,7 @@
 #include "common/status.h"
 #include "gutil/strings/substitute.h"
 #include "olap/snapshot_manager.h"
+#include "runtime/etl_job_mgr.h"
 
 using std::string;
 using std::vector;
@@ -251,4 +252,45 @@ void AgentServer::publish_cluster_state(TAgentResult& 
t_agent_result,
     status.to_thrift(&t_agent_result.status);
 }
 
+void AgentServer::submit_etl_task(TAgentResult& t_agent_result,
+                                  const TMiniLoadEtlTaskRequest& request) {
+    Status status = _exec_env->etl_job_mgr()->start_job(request);
+    auto fragment_instance_id = request.params.params.fragment_instance_id;
+    if (status.ok()) {
+        VLOG_RPC << "success to submit etl task. id=" << fragment_instance_id;
+    } else {
+        VLOG_RPC << "fail to submit etl task. id=" << fragment_instance_id
+                 << ", err_msg=" << status.get_error_msg();
+    }
+    status.to_thrift(&t_agent_result.status);
+}
+
+void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
+                                 const TMiniLoadEtlStatusRequest& request) {
+    Status status = 
_exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result);
+    if (!status.ok()) {
+        LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id 
<< "]";
+    }
+
+    VLOG_RPC << "success to get job state. [id=" << request.mini_load_id
+             << ", status=" << t_agent_result.status.status_code
+             << ", etl_state=" << t_agent_result.etl_state << ", files=";
+    for (auto& item : t_agent_result.file_map) {
+        VLOG_RPC << item.first << ":" << item.second << ";";
+    }
+    VLOG_RPC << "]";
+}
+
+void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
+                                   const TDeleteEtlFilesRequest& request) {
+    Status status = _exec_env->etl_job_mgr()->erase_job(request);
+    if (!status.ok()) {
+        LOG(WARNING) << "fail to delete etl files. because " << 
status.get_error_msg()
+                     << " with request " << request;
+    }
+
+    VLOG_RPC << "success to delete etl files. request=" << request;
+    status.to_thrift(&t_agent_result.status);
+}
+
 } // namespace doris
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index f4300f2..5ca15e5 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -47,6 +47,12 @@ public:
     // TODO(lingbin): This method is deprecated, should be removed later.
     void publish_cluster_state(TAgentResult& agent_result, const 
TAgentPublishRequest& request);
 
+    // Multi-Load will still use the following 3 methods for now.
+    void submit_etl_task(TAgentResult& agent_result, const 
TMiniLoadEtlTaskRequest& request);
+    void get_etl_status(TMiniLoadEtlStatusResult& agent_result,
+                        const TMiniLoadEtlStatusRequest& request);
+    void delete_etl_files(TAgentResult& result, const TDeleteEtlFilesRequest& 
request);
+
 private:
     DISALLOW_COPY_AND_ASSIGN(AgentServer);
 
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index f9dafab..414f405 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -60,6 +60,7 @@ set(RUNTIME_FILES
     qsorter.cpp
     fragment_mgr.cpp
     dpp_sink_internal.cpp
+    etl_job_mgr.cpp
     load_path_mgr.cpp
     types.cpp
     tmp_file_mgr.cc
diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp
new file mode 100644
index 0000000..ce4029b
--- /dev/null
+++ b/be/src/runtime/etl_job_mgr.cpp
@@ -0,0 +1,302 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/etl_job_mgr.h"
+
+#include <filesystem>
+#include <functional>
+
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "gen_cpp/MasterService_types.h"
+#include "gen_cpp/Status_types.h"
+#include "gen_cpp/Types_types.h"
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/runtime_state.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+#define VLOG_ETL VLOG_CRITICAL
+
+std::string EtlJobMgr::to_http_path(const std::string& file_name) {
+    std::stringstream url;
+    url << "http://"; << BackendOptions::get_localhost() << ":" << 
config::webserver_port
+        << "/api/_download_load?"
+        << "token=" << _exec_env->token() << "&file=" << file_name;
+    return url.str();
+}
+
+std::string EtlJobMgr::to_load_error_http_path(const std::string& file_name) {
+    std::stringstream url;
+    url << "http://"; << BackendOptions::get_localhost() << ":" << 
config::webserver_port
+        << "/api/_load_error_log?"
+        << "file=" << file_name;
+    return url.str();
+}
+
+const std::string DPP_NORMAL_ALL = "dpp.norm.ALL";
+const std::string DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
+const std::string ERROR_FILE_PREFIX = "error_log";
+
+EtlJobMgr::EtlJobMgr(ExecEnv* exec_env)
+        : _exec_env(exec_env), _success_jobs(5000), _failed_jobs(5000) {}
+
+EtlJobMgr::~EtlJobMgr() {}
+
+Status EtlJobMgr::init() {
+    return Status::OK();
+}
+
+Status EtlJobMgr::start_job(const TMiniLoadEtlTaskRequest& req) {
+    const TUniqueId& id = req.params.params.fragment_instance_id;
+    std::lock_guard<std::mutex> l(_lock);
+    auto it = _running_jobs.find(id);
+    if (it != _running_jobs.end()) {
+        // Already have this job, return what???
+        LOG(INFO) << "Duplicated etl job(" << id << ")";
+        return Status::OK();
+    }
+
+    // If already success, we return Status::OK()
+    // and wait master ask me success information
+    if (_success_jobs.exists(id)) {
+        // Already success
+        LOG(INFO) << "Already successful etl job(" << id << ")";
+        return Status::OK();
+    }
+
+    RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+            req.params, std::bind<void>(&EtlJobMgr::finalize_job, this, 
std::placeholders::_1)));
+
+    // redo this job if failed before
+    if (_failed_jobs.exists(id)) {
+        _failed_jobs.erase(id);
+    }
+
+    VLOG_ETL << "Job id(" << id << ") insert to EtlJobMgr.";
+    _running_jobs.insert(id);
+
+    return Status::OK();
+}
+
+void EtlJobMgr::report_to_master(PlanFragmentExecutor* executor) {
+    TUpdateMiniEtlTaskStatusRequest request;
+    RuntimeState* state = executor->runtime_state();
+    request.protocolVersion = FrontendServiceVersion::V1;
+    request.etlTaskId = state->fragment_instance_id();
+    Status status = get_job_state(state->fragment_instance_id(), 
&request.etlTaskStatus);
+    if (!status.ok()) {
+        return;
+    }
+    const TNetworkAddress& master_address = 
_exec_env->master_info()->network_address;
+    FrontendServiceConnection client(_exec_env->frontend_client_cache(), 
master_address,
+                                     config::thrift_rpc_timeout_ms, &status);
+    if (!status.ok()) {
+        std::stringstream ss;
+        ss << "Connect master failed, with address(" << 
master_address.hostname << ":"
+           << master_address.port << ")";
+        LOG(WARNING) << ss.str();
+        return;
+    }
+    TFeResult res;
+    try {
+        try {
+            client->updateMiniEtlTaskStatus(res, request);
+        } catch (apache::thrift::transport::TTransportException& e) {
+            LOG(WARNING) << "Retrying report etl jobs status to master(" << 
master_address.hostname
+                         << ":" << master_address.port << ") because: " << 
e.what();
+            status = client.reopen(config::thrift_rpc_timeout_ms);
+            if (!status.ok()) {
+                LOG(WARNING) << "Client repoen failed. with address(" << 
master_address.hostname
+                             << ":" << master_address.port << ")";
+                return;
+            }
+            client->updateMiniEtlTaskStatus(res, request);
+        }
+    } catch (apache::thrift::TException& e) {
+        // failed when retry.
+        // reopen to disable this connection
+        client.reopen(config::thrift_rpc_timeout_ms);
+        std::stringstream ss;
+        ss << "Report etl task to master(" << master_address.hostname << ":" 
<< master_address.port
+           << ") failed because: " << e.what();
+        LOG(WARNING) << ss.str();
+    }
+    // TODO(lingbin): check status of 'res' here.
+    // because there are some checks in updateMiniEtlTaskStatus, for example 
max_filter_ratio.
+    LOG(INFO) << "Successfully report elt job status to master.id=" << 
print_id(request.etlTaskId);
+}
+
+void EtlJobMgr::finalize_job(PlanFragmentExecutor* executor) {
+    EtlJobResult result;
+
+    RuntimeState* state = executor->runtime_state();
+    if (executor->status().ok()) {
+        // Get files
+        for (auto& it : state->output_files()) {
+            int64_t file_size = std::filesystem::file_size(it);
+            result.file_map[to_http_path(it)] = file_size;
+        }
+        // set statistics
+        result.process_normal_rows = state->num_rows_load_success();
+        result.process_abnormal_rows = state->num_rows_load_filtered();
+    } else {
+        // get debug path
+        result.process_normal_rows = state->num_rows_load_success();
+        result.process_abnormal_rows = state->num_rows_load_filtered();
+    }
+
+    result.debug_path = state->get_error_log_file_path();
+
+    finish_job(state->fragment_instance_id(), executor->status(), result);
+
+    // Try to report this finished task to master
+    report_to_master(executor);
+}
+
+Status EtlJobMgr::cancel_job(const TUniqueId& id) {
+    std::lock_guard<std::mutex> l(_lock);
+    auto it = _running_jobs.find(id);
+    if (it == _running_jobs.end()) {
+        // Nothing to do
+        LOG(INFO) << "No such job id, just print to info " << id;
+        return Status::OK();
+    }
+    _running_jobs.erase(it);
+    VLOG_ETL << "id(" << id << ") have been removed from EtlJobMgr.";
+    EtlJobCtx job_ctx;
+    job_ctx.finish_status = Status::Cancelled("Cancelled");
+    _failed_jobs.put(id, job_ctx);
+    return Status::OK();
+}
+
+Status EtlJobMgr::finish_job(const TUniqueId& id, const Status& finish_status,
+                             const EtlJobResult& result) {
+    std::lock_guard<std::mutex> l(_lock);
+
+    auto it = _running_jobs.find(id);
+    if (it == _running_jobs.end()) {
+        std::stringstream ss;
+        ss << "Unknown job id(" << id << ").";
+        return Status::InternalError(ss.str());
+    }
+    _running_jobs.erase(it);
+
+    EtlJobCtx ctx;
+    ctx.finish_status = finish_status;
+    ctx.result = result;
+    if (finish_status.ok()) {
+        _success_jobs.put(id, ctx);
+    } else {
+        _failed_jobs.put(id, ctx);
+    }
+
+    VLOG_ETL << "Move job(" << id << ") from running to "
+             << (finish_status.ok() ? "success jobs" : "failed jobs");
+
+    return Status::OK();
+}
+
+Status EtlJobMgr::get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* 
result) {
+    std::lock_guard<std::mutex> l(_lock);
+    auto it = _running_jobs.find(id);
+    if (it != _running_jobs.end()) {
+        result->status.__set_status_code(TStatusCode::OK);
+        result->__set_etl_state(TEtlState::RUNNING);
+        return Status::OK();
+    }
+    // Successful
+    if (_success_jobs.exists(id)) {
+        EtlJobCtx ctx;
+        _success_jobs.get(id, &ctx);
+        result->status.__set_status_code(TStatusCode::OK);
+        result->__set_etl_state(TEtlState::FINISHED);
+        result->__set_file_map(ctx.result.file_map);
+
+        // set counter
+        std::map<std::string, std::string> counter;
+        counter[DPP_NORMAL_ALL] = 
std::to_string(ctx.result.process_normal_rows);
+        counter[DPP_ABNORMAL_ALL] = 
std::to_string(ctx.result.process_abnormal_rows);
+        result->__set_counters(counter);
+
+        if (!ctx.result.debug_path.empty()) {
+            
result->__set_tracking_url(to_load_error_http_path(ctx.result.debug_path));
+        }
+        return Status::OK();
+    }
+    // failed information
+    if (_failed_jobs.exists(id)) {
+        EtlJobCtx ctx;
+        _failed_jobs.get(id, &ctx);
+        result->status.__set_status_code(TStatusCode::OK);
+        result->__set_etl_state(TEtlState::CANCELLED);
+
+        if (!ctx.result.debug_path.empty()) {
+            result->__set_tracking_url(to_http_path(ctx.result.debug_path));
+        }
+        return Status::OK();
+    }
+    // NO this jobs
+    result->status.__set_status_code(TStatusCode::OK);
+    result->__set_etl_state(TEtlState::CANCELLED);
+    return Status::OK();
+}
+
+Status EtlJobMgr::erase_job(const TDeleteEtlFilesRequest& req) {
+    std::lock_guard<std::mutex> l(_lock);
+    const TUniqueId& id = req.mini_load_id;
+    auto it = _running_jobs.find(id);
+    if (it != _running_jobs.end()) {
+        std::stringstream ss;
+        ss << "Job(" << id << ") is running, can not be deleted.";
+        return Status::InternalError(ss.str());
+    }
+    _success_jobs.erase(id);
+    _failed_jobs.erase(id);
+
+    return Status::OK();
+}
+
+void EtlJobMgr::debug(std::stringstream& ss) {
+    // Make things easy
+    std::lock_guard<std::mutex> l(_lock);
+
+    // Debug summary
+    ss << "we have " << _running_jobs.size() << " jobs Running\n";
+    ss << "we have " << _failed_jobs.size() << " jobs Failed\n";
+    ss << "we have " << _success_jobs.size() << " jobs Successful\n";
+    // Debug running jobs
+    for (auto& it : _running_jobs) {
+        ss << "running jobs: " << it << "\n";
+    }
+    // Debug success jobs
+    for (auto& it : _success_jobs) {
+        ss << "successful jobs: " << it.first << "\n";
+    }
+    // Debug failed jobs
+    for (auto& it : _failed_jobs) {
+        ss << "failed jobs: " << it.first << "\n";
+    }
+}
+
+} // namespace doris
diff --git a/be/src/runtime/etl_job_mgr.h b/be/src/runtime/etl_job_mgr.h
new file mode 100644
index 0000000..d930c73
--- /dev/null
+++ b/be/src/runtime/etl_job_mgr.h
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_RUNTIME_ETL_JOB_MGR_H
+#define DORIS_BE_RUNTIME_ETL_JOB_MGR_H
+
+#include <pthread.h>
+
+#include <mutex>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "gen_cpp/Types_types.h"
+#include "http/rest_monitor_iface.h"
+#include "util/hash_util.hpp"
+#include "util/lru_cache.hpp"
+
+namespace doris {
+
+// used to report to master
+struct EtlJobResult {
+    EtlJobResult() : process_normal_rows(0), process_abnormal_rows(0) {}
+    std::string debug_path;
+    std::map<std::string, int64_t> file_map;
+    int64_t process_normal_rows;
+    int64_t process_abnormal_rows;
+};
+
+// used to report to master
+struct EtlJobCtx {
+    Status finish_status;
+    EtlJobResult result;
+};
+
+class TMiniLoadEtlStatusResult;
+class TMiniLoadEtlTaskRequest;
+class ExecEnv;
+class PlanFragmentExecutor;
+class TDeleteEtlFilesRequest;
+
+// manager of all the Etl job
+// used this because master may loop be to check if a load job is finished.
+class EtlJobMgr : public RestMonitorIface {
+public:
+    EtlJobMgr(ExecEnv* exec_env);
+
+    virtual ~EtlJobMgr();
+
+    // make trash directory for collect
+    Status init();
+
+    // Make a job to running state
+    // If this job is successful, return OK
+    // If this job is failed, move this job from _failed_jobs to _running_jobs
+    // Otherwise, put it to _running_jobs
+    Status start_job(const TMiniLoadEtlTaskRequest& req);
+
+    // Make a running job to failed job
+    Status cancel_job(const TUniqueId& id);
+
+    Status finish_job(const TUniqueId& id, const Status& finish_status, const 
EtlJobResult& result);
+
+    Status get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* 
result);
+
+    Status erase_job(const TDeleteEtlFilesRequest& req);
+
+    void finalize_job(PlanFragmentExecutor* executor);
+
+    virtual void debug(std::stringstream& ss);
+
+private:
+    std::string to_http_path(const std::string& file_path);
+    std::string to_load_error_http_path(const std::string& file_path);
+
+    void report_to_master(PlanFragmentExecutor* executor);
+
+    ExecEnv* _exec_env;
+    std::mutex _lock;
+    std::unordered_set<TUniqueId> _running_jobs;
+    LruCache<TUniqueId, EtlJobCtx> _success_jobs;
+    LruCache<TUniqueId, EtlJobCtx> _failed_jobs;
+};
+
+} // namespace doris
+
+#endif
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 5c79423..36c0528 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -38,6 +38,7 @@ class BufferPool;
 class CgroupsMgr;
 class DataStreamMgr;
 class DiskIoMgr;
+class EtlJobMgr;
 class EvHttpServer;
 class ExternalScanContextMgr;
 class FragmentMgr;
@@ -126,11 +127,13 @@ public:
     ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
     PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
     ThreadPool* limited_scan_thread_pool() { return 
_limited_scan_thread_pool.get(); }
+    PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
     ThreadPool* send_batch_thread_pool() { return 
_send_batch_thread_pool.get(); }
     CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
     FragmentMgr* fragment_mgr() { return _fragment_mgr; }
     ResultCache* result_cache() { return _result_cache; }
     TMasterInfo* master_info() { return _master_info; }
+    EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; }
     LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
     DiskIoMgr* disk_io_mgr() { return _disk_io_mgr; }
     TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr; }
@@ -206,10 +209,12 @@ private:
     std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
 
     std::unique_ptr<ThreadPool> _send_batch_thread_pool;
+    PriorityThreadPool* _etl_thread_pool = nullptr;
     CgroupsMgr* _cgroups_mgr = nullptr;
     FragmentMgr* _fragment_mgr = nullptr;
     ResultCache* _result_cache = nullptr;
     TMasterInfo* _master_info = nullptr;
+    EtlJobMgr* _etl_job_mgr = nullptr;
     LoadPathMgr* _load_path_mgr = nullptr;
     DiskIoMgr* _disk_io_mgr = nullptr;
     TmpFileMgr* _tmp_file_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index cdd3417..0d483ea 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -35,6 +35,7 @@
 #include "runtime/client_cache.h"
 #include "runtime/data_stream_mgr.h"
 #include "runtime/disk_io_mgr.h"
+#include "runtime/etl_job_mgr.h"
 #include "runtime/exec_env.h"
 #include "runtime/external_scan_context_mgr.h"
 #include "runtime/fold_constant_executor.h"
@@ -68,6 +69,7 @@
 namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, 
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(etl_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, 
MetricUnit::NOUNIT);
 DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES, 
"", mem_consumption,
@@ -127,11 +129,14 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
             .set_max_queue_size(config::send_batch_thread_pool_queue_size)
             .build(&_send_batch_thread_pool);
 
+    _etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size,
+                                              
config::etl_thread_pool_queue_size);
     _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
     _fragment_mgr = new FragmentMgr(this);
     _result_cache = new ResultCache(config::query_cache_max_size_mb,
                                     config::query_cache_elasticity_size_mb);
     _master_info = new TMasterInfo();
+    _etl_job_mgr = new EtlJobMgr(this);
     _load_path_mgr = new LoadPathMgr(this);
     _disk_io_mgr = new DiskIoMgr();
     _tmp_file_mgr = new TmpFileMgr(this);
@@ -151,6 +156,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
     _extdatasource_client_cache->init_metrics("extdatasource");
     _result_mgr->init();
     _cgroups_mgr->init_cgroups();
+    _etl_job_mgr->init();
     Status status = _load_path_mgr->init();
     if (!status.ok()) {
         LOG(ERROR) << "load path mgr init failed." << status.get_error_msg();
@@ -281,6 +287,9 @@ void ExecEnv::_register_metrics() {
     REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size,
                          [this]() { return 
_scan_thread_pool->get_queue_size(); });
 
+    REGISTER_HOOK_METRIC(etl_thread_pool_queue_size,
+                         [this]() { return _etl_thread_pool->get_queue_size(); 
});
+
     REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
                          [this]() { return 
_send_batch_thread_pool->num_threads(); });
 
@@ -290,6 +299,7 @@ void ExecEnv::_register_metrics() {
 
 void ExecEnv::_deregister_metrics() {
     DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
+    DEREGISTER_HOOK_METRIC(etl_thread_pool_queue_size);
     DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
     DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
 }
@@ -309,9 +319,11 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_tmp_file_mgr);
     SAFE_DELETE(_disk_io_mgr);
     SAFE_DELETE(_load_path_mgr);
+    SAFE_DELETE(_etl_job_mgr);
     SAFE_DELETE(_master_info);
     SAFE_DELETE(_fragment_mgr);
     SAFE_DELETE(_cgroups_mgr);
+    SAFE_DELETE(_etl_thread_pool);
     SAFE_DELETE(_scan_thread_pool);
     SAFE_DELETE(_thread_mgr);
     SAFE_DELETE(_broker_client_cache);
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index ca481b5..3c9b3bd 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -98,6 +98,23 @@ public:
         _agent_server->publish_cluster_state(result, request);
     }
 
+    virtual void submit_etl_task(TAgentResult& result,
+                                 const TMiniLoadEtlTaskRequest& request) 
override {
+        VLOG_RPC << "submit_etl_task. request is "
+                 << apache::thrift::ThriftDebugString(request).c_str();
+        _agent_server->submit_etl_task(result, request);
+    }
+
+    virtual void get_etl_status(TMiniLoadEtlStatusResult& result,
+                                const TMiniLoadEtlStatusRequest& request) 
override {
+        _agent_server->get_etl_status(result, request);
+    }
+
+    virtual void delete_etl_files(TAgentResult& result,
+                                  const TDeleteEtlFilesRequest& request) 
override {
+        _agent_server->delete_etl_files(result, request);
+    }
+
     // DorisServer service
     virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
                                     const TExecPlanFragmentParams& params) 
override;
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 8f96046..ca5d05c 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -198,6 +198,7 @@ public:
     UIntGauge* query_cache_partition_total_count;
 
     UIntGauge* scanner_thread_pool_queue_size;
+    UIntGauge* etl_thread_pool_queue_size;
     UIntGauge* add_batch_task_queue_size;
     UIntGauge* send_batch_thread_pool_thread_num;
     UIntGauge* send_batch_thread_pool_queue_size;
diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt
index d2a0320..06a1214 100644
--- a/be/test/runtime/CMakeLists.txt
+++ b/be/test/runtime/CMakeLists.txt
@@ -38,6 +38,7 @@ ADD_BE_TEST(fragment_mgr_test)
 #ADD_BE_TEST(dpp_sink_internal_test)
 #ADD_BE_TEST(dpp_sink_test)
 #ADD_BE_TEST(data_spliter_test)
+#ADD_BE_TEST(etl_job_mgr_test)
 
 #ADD_BE_TEST(tmp_file_mgr_test)
 #ADD_BE_TEST(disk_io_mgr_test)
diff --git a/be/test/runtime/data_stream_test.cpp 
b/be/test/runtime/data_stream_test.cpp
index e236666..e6d4a7a 100644
--- a/be/test/runtime/data_stream_test.cpp
+++ b/be/test/runtime/data_stream_test.cpp
@@ -94,6 +94,15 @@ public:
     virtual void publish_cluster_state(TAgentResult& return_val,
                                        const TAgentPublishRequest& request) {}
 
+    virtual void submit_etl_task(TAgentResult& return_val, const 
TMiniLoadEtlTaskRequest& request) {
+    }
+
+    virtual void get_etl_status(TMiniLoadEtlStatusResult& return_val,
+                                const TMiniLoadEtlStatusRequest& request) {}
+
+    virtual void delete_etl_files(TAgentResult& return_val, const 
TDeleteEtlFilesRequest& request) {
+    }
+
     virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id,
                                          const int32_t num_senders) {}
 
diff --git a/be/test/runtime/etl_job_mgr_test.cpp 
b/be/test/runtime/etl_job_mgr_test.cpp
new file mode 100644
index 0000000..e8bcec1
--- /dev/null
+++ b/be/test/runtime/etl_job_mgr_test.cpp
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/etl_job_mgr.h"
+
+#include <gtest/gtest.h>
+
+#include "gen_cpp/Types_types.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "util/cpu_info.h"
+
+namespace doris {
+// Mock fragment mgr
+Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, 
FinishCallback cb) {
+    return Status::OK();
+}
+
+FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _thread_pool(10, 128) {}
+
+FragmentMgr::~FragmentMgr() {}
+
+void FragmentMgr::debug(std::stringstream& ss) {}
+
+class EtlJobMgrTest : public testing::Test {
+public:
+    EtlJobMgrTest() {}
+
+private:
+    ExecEnv _exec_env;
+};
+
+TEST_F(EtlJobMgrTest, NormalCase) {
+    EtlJobMgr mgr(&_exec_env);
+    TUniqueId id;
+    id.lo = 1;
+    id.hi = 1;
+
+    TMiniLoadEtlStatusResult res;
+    TMiniLoadEtlTaskRequest req;
+    TDeleteEtlFilesRequest del_req;
+    del_req.mini_load_id = id;
+    req.params.params.fragment_instance_id = id;
+
+    // make it running
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+    // make it finishing
+    EtlJobResult job_result;
+    job_result.file_map["abc"] = 100L;
+    ASSERT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::FINISHED, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+    ASSERT_EQ(1, res.file_map.size());
+    ASSERT_EQ(100, res.file_map["abc"]);
+
+    // erase it
+    ASSERT_TRUE(mgr.erase_job(del_req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::CANCELLED, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+TEST_F(EtlJobMgrTest, DuplicateCase) {
+    EtlJobMgr mgr(&_exec_env);
+    TUniqueId id;
+    id.lo = 1;
+    id.hi = 1;
+
+    TMiniLoadEtlStatusResult res;
+    TMiniLoadEtlTaskRequest req;
+    req.params.params.fragment_instance_id = id;
+
+    // make it running
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+    // Put it twice
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+TEST_F(EtlJobMgrTest, RunAfterSuccess) {
+    EtlJobMgr mgr(&_exec_env);
+    TUniqueId id;
+    id.lo = 1;
+    id.hi = 1;
+
+    TMiniLoadEtlStatusResult res;
+    TMiniLoadEtlTaskRequest req;
+    TDeleteEtlFilesRequest del_req;
+    del_req.mini_load_id = id;
+    req.params.params.fragment_instance_id = id;
+
+    // make it running
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+    // make it finishing
+    EtlJobResult job_result;
+    job_result.file_map["abc"] = 100L;
+    ASSERT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::FINISHED, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+    ASSERT_EQ(1, res.file_map.size());
+    ASSERT_EQ(100, res.file_map["abc"]);
+
+    // Put it twice
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::FINISHED, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+    ASSERT_EQ(1, res.file_map.size());
+    ASSERT_EQ(100, res.file_map["abc"]);
+}
+
+TEST_F(EtlJobMgrTest, RunAfterFail) {
+    EtlJobMgr mgr(&_exec_env);
+    TUniqueId id;
+    id.lo = 1;
+    id.hi = 1;
+
+    TMiniLoadEtlStatusResult res;
+    TMiniLoadEtlTaskRequest req;
+    req.params.params.fragment_instance_id = id;
+
+    // make it running
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+    // make it finishing
+    EtlJobResult job_result;
+    job_result.debug_path = "abc";
+    ASSERT_TRUE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), 
job_result).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::CANCELLED, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+    // Put it twice
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+TEST_F(EtlJobMgrTest, CancelJob) {
+    EtlJobMgr mgr(&_exec_env);
+    TUniqueId id;
+    id.lo = 1;
+    id.hi = 1;
+
+    TMiniLoadEtlStatusResult res;
+    TMiniLoadEtlTaskRequest req;
+    req.params.params.fragment_instance_id = id;
+
+    // make it running
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+    // make it finishing
+    EtlJobResult job_result;
+    job_result.debug_path = "abc";
+    ASSERT_TRUE(mgr.cancel_job(id).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::CANCELLED, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+    // Put it twice
+    ASSERT_TRUE(mgr.start_job(req).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+TEST_F(EtlJobMgrTest, FinishUnknownJob) {
+    EtlJobMgr mgr(&_exec_env);
+    TUniqueId id;
+    id.lo = 1;
+    id.hi = 1;
+
+    TMiniLoadEtlStatusResult res;
+
+    // make it finishing
+    EtlJobResult job_result;
+    job_result.debug_path = "abc";
+    ASSERT_FALSE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc 
error"), job_result).ok());
+    ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+    ASSERT_EQ(TEtlState::CANCELLED, res.etl_state);
+    ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+    std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
+    if (!doris::config::init(conffile.c_str(), false)) {
+        fprintf(stderr, "error read config file. \n");
+        return -1;
+    }
+    doris::CpuInfo::init();
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/gensrc/thrift/BackendService.thrift 
b/gensrc/thrift/BackendService.thrift
index 47b5997..4517076 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -141,6 +141,13 @@ service BackendService {
 
     AgentService.TAgentResult 
publish_cluster_state(1:AgentService.TAgentPublishRequest request);
 
+    AgentService.TAgentResult 
submit_etl_task(1:AgentService.TMiniLoadEtlTaskRequest request);
+
+    AgentService.TMiniLoadEtlStatusResult get_etl_status(
+            1:AgentService.TMiniLoadEtlStatusRequest request);
+
+    AgentService.TAgentResult 
delete_etl_files(1:AgentService.TDeleteEtlFilesRequest request);
+
     Status.TStatus submit_export_task(1:TExportTaskRequest request);
 
     PaloInternalService.TExportStatusResult 
get_export_status(1:Types.TUniqueId task_id);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to