This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 478b36edb75 branch-3.0: [opt](http) enable auth token with BE http
request (#43659)
478b36edb75 is described below
commit 478b36edb75e669da433c2bcd9d3516ae9f3d41a
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Nov 12 16:10:16 2024 +0800
branch-3.0: [opt](http) enable auth token with BE http request (#43659)
Cherry-picked from #41994
Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
Co-authored-by: morningman <[email protected]>
---
be/src/agent/agent_server.cpp | 34 ++++++------
be/src/agent/agent_server.h | 8 +--
be/src/agent/heartbeat_server.cpp | 63 +++++++++++++---------
be/src/agent/heartbeat_server.h | 13 ++---
be/src/agent/task_worker_pool.cpp | 40 +++++++-------
be/src/agent/task_worker_pool.h | 16 +++---
be/src/agent/utils.cpp | 49 ++++++++---------
be/src/agent/utils.h | 10 ++--
be/src/cloud/cloud_meta_mgr.cpp | 2 +-
be/src/common/utils.h | 3 +-
.../schema_active_queries_scanner.cpp | 2 +-
.../schema_scanner/schema_partitions_scanner.cpp | 2 +-
.../exec/schema_scanner/schema_routine_scanner.cpp | 4 +-
.../schema_table_options_scanner.cpp | 2 +-
.../schema_table_properties_scanner.cpp | 2 +-
.../schema_workload_group_privileges.cpp | 4 +-
.../schema_workload_groups_scanner.cpp | 4 +-
.../schema_workload_sched_policy_scanner.cpp | 4 +-
be/src/http/action/compaction_score_action.cpp | 2 +-
be/src/http/action/http_stream.cpp | 8 +--
be/src/http/action/stream_load.cpp | 2 +-
be/src/http/http_client.cpp | 4 ++
be/src/http/http_client.h | 8 +++
be/src/http/http_common.h | 2 +-
be/src/http/http_handler_with_auth.cpp | 19 ++++++-
be/src/http/http_headers.cpp | 3 +-
be/src/http/http_headers.h | 3 +-
be/src/http/utils.cpp | 11 +++-
be/src/io/fs/multi_table_pipe.cpp | 2 +-
be/src/olap/olap_server.cpp | 6 +--
be/src/olap/task/engine_clone_task.cpp | 6 +--
be/src/olap/task/engine_clone_task.h | 6 +--
be/src/olap/wal/wal_manager.cpp | 6 +--
be/src/olap/wal/wal_table.cpp | 14 +++--
be/src/runtime/cluster_info.h | 48 +++++++++++++++++
be/src/runtime/exec_env.cpp | 15 ++++--
be/src/runtime/exec_env.h | 12 +++--
be/src/runtime/exec_env_init.cpp | 6 +--
be/src/runtime/fragment_mgr.cpp | 4 +-
be/src/runtime/group_commit_mgr.cpp | 26 +++++----
.../routine_load/routine_load_task_executor.cpp | 2 +
be/src/runtime/runtime_query_statistics_mgr.cpp | 6 +--
be/src/runtime/small_file_mgr.cpp | 6 +--
be/src/runtime/snapshot_loader.cpp | 2 +-
.../runtime/stream_load/stream_load_executor.cpp | 12 ++---
.../workload_group/workload_group_manager.cpp | 2 +-
be/src/service/backend_service.cpp | 2 +-
be/src/service/doris_main.cpp | 8 ++-
be/src/service/internal_service.cpp | 2 +-
be/src/vec/exec/scan/vmeta_scanner.cpp | 2 +-
be/src/vec/sink/autoinc_buffer.cpp | 4 +-
be/src/vec/sink/vrow_distribution.cpp | 4 +-
be/test/agent/heartbeat_server_test.cpp | 12 ++---
be/test/agent/mock_utils.h | 2 +-
be/test/agent/task_worker_pool_test.cpp | 7 +--
be/test/http/http_client_test.cpp | 34 +++++++++++-
be/test/olap/wal/wal_manager_test.cpp | 12 ++---
.../runtime/routine_load_task_executor_test.cpp | 6 +--
be/test/vec/exec/vfile_scanner_exception_test.cpp | 13 ++---
be/test/vec/exec/vwal_scanner_test.cpp | 13 ++---
.../main/java/org/apache/doris/catalog/Env.java | 8 +++
.../{load/loadv2 => catalog}/TokenManager.java | 10 +++-
.../org/apache/doris/httpv2/rest/LoadAction.java | 2 +-
.../org/apache/doris/load/loadv2/LoadManager.java | 9 +---
.../apache/doris/load/loadv2/MysqlLoadManager.java | 6 +--
.../doris/load/sync/canal/CanalSyncChannel.java | 2 +-
.../trees/plans/commands/insert/InsertUtils.java | 2 +-
.../org/apache/doris/plugin/audit/AuditLoader.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 4 +-
.../apache/doris/service/FrontendServiceImpl.java | 18 +++----
.../java/org/apache/doris/system/HeartbeatMgr.java | 1 +
.../apache/doris/transaction/TransactionEntry.java | 2 +-
.../apache/doris/load/loadv2/TokenManagerTest.java | 1 +
gensrc/thrift/FrontendService.thrift | 24 ++++-----
gensrc/thrift/HeartbeatService.thrift | 1 +
75 files changed, 442 insertions(+), 276 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 361a8ab93a9..0b17f3782e7 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -49,9 +49,9 @@ using std::vector;
namespace doris {
-AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
- : _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
- MasterServerClient::create(master_info);
+AgentServer::AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info)
+ : _cluster_info(cluster_info), _topic_subscriber(new
TopicSubscriber()) {
+ MasterServerClient::create(cluster_info);
#if !defined(BE_TEST) && !defined(__APPLE__)
// Add subscriber here and register listeners
@@ -170,7 +170,7 @@ void AgentServer::start_workers(StorageEngine& engine,
ExecEnv* exec_env) {
"ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&&
task) { return alter_tablet_callback(engine, task); });
_workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
- "CLONE", config::clone_worker_count, [&engine, &master_info =
_master_info](auto&& task) { return clone_callback(engine, master_info, task);
});
+ "CLONE", config::clone_worker_count, [&engine, &cluster_info =
_cluster_info](auto&& task) { return clone_callback(engine, cluster_info,
task); });
_workers[TTaskType::STORAGE_MEDIUM_MIGRATE] =
std::make_unique<TaskWorkerPool>(
"STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count,
[&engine](auto&& task) { return storage_medium_migrate_callback(engine, task);
});
@@ -188,13 +188,13 @@ void AgentServer::start_workers(StorageEngine& engine,
ExecEnv* exec_env) {
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return
visible_version_callback(engine, task); });
_report_workers.push_back(std::make_unique<ReportWorker>(
- "REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); }));
+ "REPORT_TASK", _cluster_info,
config::report_task_interval_seconds, [&cluster_info = _cluster_info] {
report_task_callback(cluster_info); }));
_report_workers.push_back(std::make_unique<ReportWorker>(
- "REPORT_DISK_STATE", _master_info,
config::report_disk_state_interval_seconds, [&engine, &master_info =
_master_info] { report_disk_callback(engine, master_info); }));
+ "REPORT_DISK_STATE", _cluster_info,
config::report_disk_state_interval_seconds, [&engine, &cluster_info =
_cluster_info] { report_disk_callback(engine, cluster_info); }));
_report_workers.push_back(std::make_unique<ReportWorker>(
- "REPORT_OLAP_TABLET", _master_info,
config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info); }));
+ "REPORT_OLAP_TABLET", _cluster_info,
config::report_tablet_interval_seconds,[&engine, &cluster_info = _cluster_info]
{ report_tablet_callback(engine, cluster_info); }));
// clang-format on
}
@@ -217,18 +217,20 @@ void AgentServer::cloud_start_workers(CloudStorageEngine&
engine, ExecEnv* exec_
"DROP_TABLE", 1, [&engine](auto&& task) { return
drop_tablet_callback(engine, task); });
_report_workers.push_back(std::make_unique<ReportWorker>(
- "REPORT_TASK", _master_info, config::report_task_interval_seconds,
- [&master_info = _master_info] { report_task_callback(master_info);
}));
+ "REPORT_TASK", _cluster_info, config::report_task_interval_seconds,
+ [&cluster_info = _cluster_info] {
report_task_callback(cluster_info); }));
_report_workers.push_back(std::make_unique<ReportWorker>(
- "REPORT_DISK_STATE", _master_info,
config::report_disk_state_interval_seconds,
- [&engine, &master_info = _master_info] {
report_disk_callback(engine, master_info); }));
+ "REPORT_DISK_STATE", _cluster_info,
config::report_disk_state_interval_seconds,
+ [&engine, &cluster_info = _cluster_info] {
+ report_disk_callback(engine, cluster_info);
+ }));
if (config::enable_cloud_tablet_report) {
_report_workers.push_back(std::make_unique<ReportWorker>(
- "REPORT_OLAP_TABLET", _master_info,
config::report_tablet_interval_seconds,
- [&engine, &master_info = _master_info] {
- report_tablet_callback(engine, master_info);
+ "REPORT_OLAP_TABLET", _cluster_info,
config::report_tablet_interval_seconds,
+ [&engine, &cluster_info = _cluster_info] {
+ report_tablet_callback(engine, cluster_info);
}));
}
}
@@ -239,8 +241,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
const std::vector<TAgentTaskRequest>& tasks) {
Status ret_st;
- // TODO check master_info here if it is the same with that of heartbeat rpc
- if (_master_info.network_address.hostname.empty() ||
_master_info.network_address.port == 0) {
+ // TODO check cluster_info here if it is the same with that of heartbeat
rpc
+ if (_cluster_info->master_fe_addr.hostname.empty() ||
_cluster_info->master_fe_addr.port == 0) {
Status ret_st = Status::Cancelled("Have not get FE Master heartbeat
yet");
ret_st.to_thrift(&agent_result.status);
return;
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 5a7fbafb72e..e5b5b522ba0 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -35,7 +35,7 @@ class ExecEnv;
class TAgentPublishRequest;
class TAgentResult;
class TAgentTaskRequest;
-class TMasterInfo;
+class ClusterInfo;
class TSnapshotRequest;
class StorageEngine;
class CloudStorageEngine;
@@ -43,7 +43,7 @@ class CloudStorageEngine;
// Each method corresponds to one RPC from FE Master, see BackendService.
class AgentServer {
public:
- explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info);
+ explicit AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info);
~AgentServer();
void start_workers(StorageEngine& engine, ExecEnv* exec_env);
@@ -63,8 +63,8 @@ public:
void stop_report_workers();
private:
- // Reference to the ExecEnv::_master_info
- const TMasterInfo& _master_info;
+ // Reference to the ExecEnv::_cluster_info
+ const ClusterInfo* _cluster_info;
std::unordered_map<int64_t /* TTaskType */,
std::unique_ptr<TaskWorkerPoolIf>> _workers;
diff --git a/be/src/agent/heartbeat_server.cpp
b/be/src/agent/heartbeat_server.cpp
index 78002ed08fe..11345ea06f0 100644
--- a/be/src/agent/heartbeat_server.cpp
+++ b/be/src/agent/heartbeat_server.cpp
@@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/status.h"
#include "olap/storage_engine.h"
+#include "runtime/cluster_info.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/heartbeat_flags.h"
@@ -49,15 +50,15 @@ class TProcessor;
namespace doris {
-HeartbeatServer::HeartbeatServer(TMasterInfo* master_info)
+HeartbeatServer::HeartbeatServer(ClusterInfo* cluster_info)
: _engine(ExecEnv::GetInstance()->storage_engine()),
- _master_info(master_info),
+ _cluster_info(cluster_info),
_fe_epoch(0) {
_be_epoch = GetCurrentTimeMicros() / 1000;
}
void HeartbeatServer::init_cluster_id() {
- _master_info->cluster_id = _engine.effective_cluster_id();
+ _cluster_info->cluster_id = _engine.effective_cluster_id();
}
void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
@@ -65,7 +66,7 @@ void HeartbeatServer::heartbeat(THeartbeatResult&
heartbeat_result,
//print heartbeat in every minute
LOG_EVERY_N(INFO, 12) << "get heartbeat from FE."
<< "host:" << master_info.network_address.hostname
- << ", port:" << master_info.network_address.port
+ << ", rpc port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
<< ", frontend_info:" <<
PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start
time: " << _be_epoch;
@@ -108,22 +109,23 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
std::lock_guard<std::mutex> lk(_hb_mtx);
// Check cluster id
- if (_master_info->cluster_id == -1) {
+ if (_cluster_info->cluster_id == -1) {
LOG(INFO) << "get first heartbeat. update cluster id";
// write and update cluster id
RETURN_IF_ERROR(_engine.set_cluster_id(master_info.cluster_id));
- _master_info->cluster_id = master_info.cluster_id;
+ _cluster_info->cluster_id = master_info.cluster_id;
LOG(INFO) << "record cluster id. host: " <<
master_info.network_address.hostname
<< ". port: " << master_info.network_address.port
<< ". cluster id: " << master_info.cluster_id
<< ". frontend_infos: " <<
PrintFrontendInfos(master_info.frontend_infos);
} else {
- if (_master_info->cluster_id != master_info.cluster_id) {
+ if (_cluster_info->cluster_id != master_info.cluster_id) {
return Status::InternalError(
"invalid cluster id. ignore. Record cluster id ={}, record
frontend info {}. "
"Invalid cluster_id={}, invalid frontend info {}",
- _master_info->cluster_id,
PrintFrontendInfos(_master_info->frontend_infos),
+ _cluster_info->cluster_id,
+
PrintFrontendInfos(ExecEnv::GetInstance()->get_frontends()),
master_info.cluster_id,
PrintFrontendInfos(master_info.frontend_infos));
}
}
@@ -183,22 +185,22 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
}
bool need_report = false;
- if (_master_info->network_address.hostname !=
master_info.network_address.hostname ||
- _master_info->network_address.port !=
master_info.network_address.port) {
+ if (_cluster_info->master_fe_addr.hostname !=
master_info.network_address.hostname ||
+ _cluster_info->master_fe_addr.port !=
master_info.network_address.port) {
if (master_info.epoch > _fe_epoch) {
- _master_info->network_address.hostname =
master_info.network_address.hostname;
- _master_info->network_address.port =
master_info.network_address.port;
+ _cluster_info->master_fe_addr.hostname =
master_info.network_address.hostname;
+ _cluster_info->master_fe_addr.port =
master_info.network_address.port;
_fe_epoch = master_info.epoch;
need_report = true;
LOG(INFO) << "master change. new master host: "
- << _master_info->network_address.hostname
- << ". port: " << _master_info->network_address.port
+ << _cluster_info->master_fe_addr.hostname
+ << ". port: " << _cluster_info->master_fe_addr.port
<< ". epoch: " << _fe_epoch;
} else {
return Status::InternalError(
"epoch is not greater than local. ignore heartbeat. host:
{}, port: {}, local "
"epoch: {}, received epoch: {}",
- _master_info->network_address.hostname,
_master_info->network_address.port,
+ _cluster_info->master_fe_addr.hostname,
_cluster_info->master_fe_addr.port,
_fe_epoch, master_info.epoch);
}
} else {
@@ -211,16 +213,17 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
}
if (master_info.__isset.token) {
- if (!_master_info->__isset.token) {
- _master_info->__set_token(master_info.token);
- LOG(INFO) << "get token. token: " << _master_info->token;
- } else if (_master_info->token != master_info.token) {
- return Status::InternalError("invalid token");
+ if (_cluster_info->token == "") {
+ _cluster_info->token = master_info.token;
+ LOG(INFO) << "get token. token: " << _cluster_info->token;
+ } else if (_cluster_info->token != master_info.token) {
+ return Status::InternalError("invalid token. local: {}, master:
{}",
+ _cluster_info->token,
master_info.token);
}
}
if (master_info.__isset.http_port) {
- _master_info->__set_http_port(master_info.http_port);
+ _cluster_info->master_fe_http_port = master_info.http_port;
}
if (master_info.__isset.heartbeat_flags) {
@@ -229,7 +232,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
}
if (master_info.__isset.backend_id) {
- _master_info->__set_backend_id(master_info.backend_id);
+ _cluster_info->backend_id = master_info.backend_id;
BackendOptions::set_backend_id(master_info.backend_id);
}
if (master_info.__isset.frontend_infos) {
@@ -281,6 +284,18 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
master_info.tablet_report_inactive_duration_ms;
}
+ if (master_info.__isset.auth_token) {
+ if (_cluster_info->curr_auth_token == "") {
+ _cluster_info->curr_auth_token = master_info.auth_token;
+ LOG(INFO) << "set new auth token: " << master_info.auth_token;
+ } else if (_cluster_info->curr_auth_token != master_info.auth_token) {
+ LOG(INFO) << "last auth token: " << _cluster_info->last_auth_token
+ << "set new auth token: " << master_info.auth_token;
+ _cluster_info->last_auth_token = _cluster_info->curr_auth_token;
+ _cluster_info->curr_auth_token = master_info.auth_token;
+ }
+ }
+
if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and
disk info immediately";
_engine.notify_listeners();
@@ -291,8 +306,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo&
master_info) {
Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port,
std::unique_ptr<ThriftServer>* thrift_server,
- uint32_t worker_thread_num, TMasterInfo*
local_master_info) {
- HeartbeatServer* heartbeat_server = new HeartbeatServer(local_master_info);
+ uint32_t worker_thread_num, ClusterInfo*
cluster_info) {
+ HeartbeatServer* heartbeat_server = new HeartbeatServer(cluster_info);
if (heartbeat_server == nullptr) {
return Status::InternalError("Get heartbeat server failed");
}
diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h
index 29fecd07881..67d67fd486d 100644
--- a/be/src/agent/heartbeat_server.h
+++ b/be/src/agent/heartbeat_server.h
@@ -26,6 +26,7 @@
#include "common/status.h"
namespace doris {
+class ClusterInfo;
class ExecEnv;
class THeartbeatResult;
class TMasterInfo;
@@ -36,7 +37,7 @@ class ThriftServer;
class HeartbeatServer : public HeartbeatServiceIf {
public:
- explicit HeartbeatServer(TMasterInfo* master_info);
+ explicit HeartbeatServer(ClusterInfo* cluster_info);
~HeartbeatServer() override = default;
void init_cluster_id();
@@ -44,7 +45,7 @@ public:
// Master send heartbeat to this server
//
// Input parameters:
- // * master_info: The struct of master info, contains host ip and port
+ // * master_info: The struct of master info, contains cluster info from
Master FE
//
// Output parameters:
// * heartbeat_result: The result of heartbeat set
@@ -56,10 +57,10 @@ private:
BaseStorageEngine& _engine;
int64_t _be_epoch;
- // mutex to protect master_info and _epoch
+ // mutex to protect cluster_info and _epoch
std::mutex _hb_mtx;
- // Not owned. Point to the ExecEnv::_master_info
- TMasterInfo* _master_info = nullptr;
+ // Not owned. Point to the ExecEnv::_cluster_info
+ ClusterInfo* _cluster_info = nullptr;
int64_t _fe_epoch;
DISALLOW_COPY_AND_ASSIGN(HeartbeatServer);
@@ -67,5 +68,5 @@ private:
Status create_heartbeat_server(ExecEnv* exec_env, uint32_t
heartbeat_server_port,
std::unique_ptr<ThriftServer>*
heart_beat_server,
- uint32_t worker_thread_num, TMasterInfo*
local_master_info);
+ uint32_t worker_thread_num, ClusterInfo*
cluster_info);
} // namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 4614304e439..01b107b3ea7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -378,22 +378,22 @@ Status check_migrate_request(StorageEngine& engine, const
TStorageMediumMigrateR
}
// Return `true` if report success
-bool handle_report(const TReportRequest& request, const TMasterInfo&
master_info,
+bool handle_report(const TReportRequest& request, const ClusterInfo*
cluster_info,
std::string_view name) {
TMasterResult result;
Status status = MasterServerClient::instance()->report(request, &result);
if (!status.ok()) [[unlikely]] {
LOG_WARNING("failed to report {}", name)
- .tag("host", master_info.network_address.hostname)
- .tag("port", master_info.network_address.port)
+ .tag("host", cluster_info->master_fe_addr.hostname)
+ .tag("port", cluster_info->master_fe_addr.port)
.error(status);
return false;
}
else if (result.status.status_code != TStatusCode::OK) [[unlikely]] {
LOG_WARNING("failed to report {}", name)
- .tag("host", master_info.network_address.hostname)
- .tag("port", master_info.network_address.port)
+ .tag("host", cluster_info->master_fe_addr.hostname)
+ .tag("port", cluster_info->master_fe_addr.port)
.error(result.status);
return false;
}
@@ -663,10 +663,10 @@ void PriorTaskWorkerPool::high_prior_loop() {
}
}
-ReportWorker::ReportWorker(std::string name, const TMasterInfo& master_info,
int report_interval_s,
+ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info,
int report_interval_s,
std::function<void()> callback)
: _name(std::move(name)) {
- auto report_loop = [this, &master_info, report_interval_s, callback =
std::move(callback)] {
+ auto report_loop = [this, cluster_info, report_interval_s, callback =
std::move(callback)] {
auto& engine = ExecEnv::GetInstance()->storage_engine();
engine.register_report_listener(this);
while (true) {
@@ -685,7 +685,7 @@ ReportWorker::ReportWorker(std::string name, const
TMasterInfo& master_info, int
}
}
- if (master_info.network_address.port == 0) {
+ if (cluster_info->master_fe_addr.port == 0) {
// port == 0 means not received heartbeat yet
LOG(INFO) << "waiting to receive first heartbeat from frontend
before doing report";
continue;
@@ -990,7 +990,7 @@ void check_consistency_callback(StorageEngine& engine,
const TAgentTaskRequest&
remove_task_info(req.task_type, req.signature);
}
-void report_task_callback(const TMasterInfo& master_info) {
+void report_task_callback(const ClusterInfo* cluster_info) {
TReportRequest request;
if (config::report_random_wait) {
random_sleep(5);
@@ -1007,14 +1007,14 @@ void report_task_callback(const TMasterInfo&
master_info) {
}
}
request.__set_backend(BackendOptions::get_local_backend());
- bool succ = handle_report(request, master_info, "task");
+ bool succ = handle_report(request, cluster_info, "task");
report_task_total << 1;
if (!succ) [[unlikely]] {
report_task_failed << 1;
}
}
-void report_disk_callback(StorageEngine& engine, const TMasterInfo&
master_info) {
+void report_disk_callback(StorageEngine& engine, const ClusterInfo*
cluster_info) {
TReportRequest request;
request.__set_backend(BackendOptions::get_local_backend());
request.__isset.disks = true;
@@ -1039,14 +1039,14 @@ void report_disk_callback(StorageEngine& engine, const
TMasterInfo& master_info)
request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
?
config::pipeline_executor_size
: CpuInfo::num_cores());
- bool succ = handle_report(request, master_info, "disk");
+ bool succ = handle_report(request, cluster_info, "disk");
report_disk_total << 1;
if (!succ) [[unlikely]] {
report_disk_failed << 1;
}
}
-void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo&
master_info) {
+void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo*
cluster_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests
at the same time
// and can not be processed.
@@ -1066,12 +1066,12 @@ void report_disk_callback(CloudStorageEngine& engine,
const TMasterInfo& master_
request.__set_pipeline_executor_size(config::pipeline_executor_size > 0
?
config::pipeline_executor_size
: CpuInfo::num_cores());
- bool succ = handle_report(request, master_info, "disk");
+ bool succ = handle_report(request, cluster_info, "disk");
report_disk_total << 1;
report_disk_failed << !succ;
}
-void report_tablet_callback(StorageEngine& engine, const TMasterInfo&
master_info) {
+void report_tablet_callback(StorageEngine& engine, const ClusterInfo*
cluster_info) {
if (config::report_random_wait) {
random_sleep(5);
}
@@ -1133,14 +1133,14 @@ void report_tablet_callback(StorageEngine& engine,
const TMasterInfo& master_inf
}
request.__isset.resource = true;
- bool succ = handle_report(request, master_info, "tablet");
+ bool succ = handle_report(request, cluster_info, "tablet");
report_tablet_total << 1;
if (!succ) [[unlikely]] {
report_tablet_failed << 1;
}
}
-void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo&
master_info) {
+void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo*
cluster_info) {
// Random sleep 1~5 seconds before doing report.
// In order to avoid the problem that the FE receives many report requests
at the same time
// and can not be processed.
@@ -1173,7 +1173,7 @@ void report_tablet_callback(CloudStorageEngine& engine,
const TMasterInfo& maste
request.__set_report_version(report_version);
request.__set_num_tablets(total_num_tablets);
- bool succ = handle_report(request, master_info, "tablet");
+ bool succ = handle_report(request, cluster_info, "tablet");
report_tablet_total << 1;
if (!succ) [[unlikely]] {
report_tablet_failed << 1;
@@ -2013,7 +2013,7 @@ void visible_version_callback(StorageEngine& engine,
const TAgentTaskRequest& re
visible_version_req.partition_version);
}
-void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
+void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
const TAgentTaskRequest& req) {
const auto& clone_req = req.clone_req;
@@ -2021,7 +2021,7 @@ void clone_callback(StorageEngine& engine, const
TMasterInfo& master_info,
LOG(INFO) << "get clone task. signature=" << req.signature;
std::vector<TTabletInfo> tablet_infos;
- EngineCloneTask engine_task(engine, clone_req, master_info, req.signature,
&tablet_infos);
+ EngineCloneTask engine_task(engine, clone_req, cluster_info,
req.signature, &tablet_infos);
SCOPED_ATTACH_TASK(engine_task.mem_tracker());
auto status = engine_task.execute();
// Return result to fe
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index c50ac57ffe9..f6223affd07 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -36,10 +36,10 @@ class StorageEngine;
class CloudStorageEngine;
class Thread;
class ThreadPool;
-class TMasterInfo;
class TReportRequest;
class TTabletInfo;
class TAgentTaskRequest;
+class ClusterInfo;
class TaskWorkerPoolIf {
public:
@@ -109,7 +109,7 @@ private:
class ReportWorker {
public:
- ReportWorker(std::string name, const TMasterInfo& master_info, int
report_interval_s,
+ ReportWorker(std::string name, const ClusterInfo* cluster_info, int
report_interval_s,
std::function<void()> callback);
~ReportWorker();
@@ -169,7 +169,7 @@ void alter_tablet_callback(StorageEngine& engine, const
TAgentTaskRequest& req);
void alter_cloud_tablet_callback(CloudStorageEngine& engine, const
TAgentTaskRequest& req);
-void clone_callback(StorageEngine& engine, const TMasterInfo& master_info,
+void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info,
const TAgentTaskRequest& req);
void storage_medium_migrate_callback(StorageEngine& engine, const
TAgentTaskRequest& req);
@@ -182,15 +182,15 @@ void clean_udf_cache_callback(const TAgentTaskRequest&
req);
void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest&
req);
-void report_task_callback(const TMasterInfo& master_info);
+void report_task_callback(const ClusterInfo* cluster_info);
-void report_disk_callback(StorageEngine& engine, const TMasterInfo&
master_info);
+void report_disk_callback(StorageEngine& engine, const ClusterInfo*
cluster_info);
-void report_disk_callback(CloudStorageEngine& engine, const TMasterInfo&
master_info);
+void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo*
cluster_info);
-void report_tablet_callback(StorageEngine& engine, const TMasterInfo&
master_info);
+void report_tablet_callback(StorageEngine& engine, const ClusterInfo*
cluster_info);
-void report_tablet_callback(CloudStorageEngine& engine, const TMasterInfo&
master_info);
+void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo*
cluster_info);
void calc_delete_bitmap_callback(CloudStorageEngine& engine, const
TAgentTaskRequest& req);
diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp
index 71e7aedbc35..2a1f9994b01 100644
--- a/be/src/agent/utils.cpp
+++ b/be/src/agent/utils.cpp
@@ -43,6 +43,7 @@
#include "common/config.h"
#include "common/status.h"
#include "runtime/client_cache.h"
+#include "runtime/cluster_info.h"
namespace doris {
class TConfirmUnusedRemoteFilesRequest;
@@ -61,8 +62,8 @@ namespace doris {
static FrontendServiceClientCache s_client_cache;
static std::unique_ptr<MasterServerClient> s_client;
-MasterServerClient* MasterServerClient::create(const TMasterInfo& master_info)
{
- s_client.reset(new MasterServerClient(master_info));
+MasterServerClient* MasterServerClient::create(const ClusterInfo*
cluster_info) {
+ s_client.reset(new MasterServerClient(cluster_info));
return s_client.get();
}
@@ -70,18 +71,18 @@ MasterServerClient* MasterServerClient::instance() {
return s_client.get();
}
-MasterServerClient::MasterServerClient(const TMasterInfo& master_info)
- : _master_info(master_info) {}
+MasterServerClient::MasterServerClient(const ClusterInfo* cluster_info)
+ : _cluster_info(cluster_info) {}
Status MasterServerClient::finish_task(const TFinishTaskRequest& request,
TMasterResult* result) {
Status client_status;
- FrontendServiceConnection client(&s_client_cache,
_master_info.network_address,
+ FrontendServiceConnection client(&s_client_cache,
_cluster_info->master_fe_addr,
config::thrift_rpc_timeout_ms,
&client_status);
if (!client_status.ok()) {
LOG(WARNING) << "fail to get master client from cache. "
- << "host=" << _master_info.network_address.hostname
- << ", port=" << _master_info.network_address.port
+ << "host=" << _cluster_info->master_fe_addr.hostname
+ << ", port=" << _cluster_info->master_fe_addr.port
<< ", code=" << client_status.code();
return Status::InternalError("Failed to get master client");
}
@@ -97,8 +98,8 @@ Status MasterServerClient::finish_task(const
TFinishTaskRequest& request, TMaste
if (!client_status.ok()) {
#ifdef ADDRESS_SANITIZER
LOG(WARNING) << "fail to get master client from cache. "
- << "host=" <<
_master_info.network_address.hostname
- << ", port=" << _master_info.network_address.port
+ << "host=" <<
_cluster_info->master_fe_addr.hostname
+ << ", port=" << _cluster_info->master_fe_addr.port
<< ", code=" << client_status.code();
#endif
return Status::RpcError("Master client finish task failed");
@@ -108,8 +109,8 @@ Status MasterServerClient::finish_task(const
TFinishTaskRequest& request, TMaste
} catch (std::exception& e) {
RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
LOG(WARNING) << "fail to finish_task. "
- << "host=" << _master_info.network_address.hostname
- << ", port=" << _master_info.network_address.port << ",
error=" << e.what();
+ << "host=" << _cluster_info->master_fe_addr.hostname
+ << ", port=" << _cluster_info->master_fe_addr.port << ",
error=" << e.what();
return Status::InternalError("Fail to finish task");
}
@@ -118,13 +119,13 @@ Status MasterServerClient::finish_task(const
TFinishTaskRequest& request, TMaste
Status MasterServerClient::report(const TReportRequest& request,
TMasterResult* result) {
Status client_status;
- FrontendServiceConnection client(&s_client_cache,
_master_info.network_address,
+ FrontendServiceConnection client(&s_client_cache,
_cluster_info->master_fe_addr,
config::thrift_rpc_timeout_ms,
&client_status);
if (!client_status.ok()) {
LOG(WARNING) << "fail to get master client from cache. "
- << "host=" << _master_info.network_address.hostname
- << ", port=" << _master_info.network_address.port
+ << "host=" << _cluster_info->master_fe_addr.hostname
+ << ", port=" << _cluster_info->master_fe_addr.port
<< ", code=" << client_status;
return Status::InternalError("Fail to get master client from cache");
}
@@ -144,8 +145,8 @@ Status MasterServerClient::report(const TReportRequest&
request, TMasterResult*
if (!client_status.ok()) {
#ifdef ADDRESS_SANITIZER
LOG(WARNING) << "fail to get master client from cache. "
- << "host=" <<
_master_info.network_address.hostname
- << ", port=" <<
_master_info.network_address.port
+ << "host=" <<
_cluster_info->master_fe_addr.hostname
+ << ", port=" <<
_cluster_info->master_fe_addr.port
<< ", code=" << client_status.code();
#endif
return Status::InternalError("Fail to get master client
from cache");
@@ -164,8 +165,8 @@ Status MasterServerClient::report(const TReportRequest&
request, TMasterResult*
} catch (std::exception& e) {
RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
LOG(WARNING) << "fail to report to master. "
- << "host=" << _master_info.network_address.hostname
- << ", port=" << _master_info.network_address.port
+ << "host=" << _cluster_info->master_fe_addr.hostname
+ << ", port=" << _cluster_info->master_fe_addr.port
<< ", code=" << client_status.code() << ", reason=" <<
e.what();
return Status::InternalError("Fail to report to master");
}
@@ -176,13 +177,13 @@ Status MasterServerClient::report(const TReportRequest&
request, TMasterResult*
Status MasterServerClient::confirm_unused_remote_files(
const TConfirmUnusedRemoteFilesRequest& request,
TConfirmUnusedRemoteFilesResult* result) {
Status client_status;
- FrontendServiceConnection client(&s_client_cache,
_master_info.network_address,
+ FrontendServiceConnection client(&s_client_cache,
_cluster_info->master_fe_addr,
config::thrift_rpc_timeout_ms,
&client_status);
if (!client_status.ok()) {
return Status::InternalError(
"fail to get master client from cache. host={}, port={},
code={}",
- _master_info.network_address.hostname,
_master_info.network_address.port,
+ _cluster_info->master_fe_addr.hostname,
_cluster_info->master_fe_addr.port,
client_status.code());
}
try {
@@ -198,8 +199,8 @@ Status MasterServerClient::confirm_unused_remote_files(
if (!client_status.ok()) {
return Status::InternalError(
"fail to get master client from cache. host={},
port={}, code={}",
- _master_info.network_address.hostname,
- _master_info.network_address.port,
client_status.code());
+ _cluster_info->master_fe_addr.hostname,
+ _cluster_info->master_fe_addr.port,
client_status.code());
}
client->confirmUnusedRemoteFiles(*result, request);
@@ -208,7 +209,7 @@ Status MasterServerClient::confirm_unused_remote_files(
// actually we don't care what FE returns.
return Status::InternalError(
"fail to confirm unused remote files. host={},
port={}, code={}, reason={}",
- _master_info.network_address.hostname,
_master_info.network_address.port,
+ _cluster_info->master_fe_addr.hostname,
_cluster_info->master_fe_addr.port,
client_status.code(), e.what());
}
}
@@ -216,7 +217,7 @@ Status MasterServerClient::confirm_unused_remote_files(
RETURN_IF_ERROR(client.reopen(config::thrift_rpc_timeout_ms));
return Status::InternalError(
"fail to confirm unused remote files. host={}, port={},
code={}, reason={}",
- _master_info.network_address.hostname,
_master_info.network_address.port,
+ _cluster_info->master_fe_addr.hostname,
_cluster_info->master_fe_addr.port,
client_status.code(), e.what());
}
diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h
index eea8f9f8cff..d554e92ff67 100644
--- a/be/src/agent/utils.h
+++ b/be/src/agent/utils.h
@@ -28,13 +28,13 @@ namespace doris {
class TConfirmUnusedRemoteFilesRequest;
class TConfirmUnusedRemoteFilesResult;
class TFinishTaskRequest;
-class TMasterInfo;
class TMasterResult;
class TReportRequest;
+class ClusterInfo;
class MasterServerClient {
public:
- static MasterServerClient* create(const TMasterInfo& master_info);
+ static MasterServerClient* create(const ClusterInfo* cluster_info);
static MasterServerClient* instance();
~MasterServerClient() = default;
@@ -61,12 +61,12 @@ public:
TConfirmUnusedRemoteFilesResult*
result);
private:
- MasterServerClient(const TMasterInfo& master_info);
+ MasterServerClient(const ClusterInfo* cluster_info);
DISALLOW_COPY_AND_ASSIGN(MasterServerClient);
- // Not owner. Reference to the ExecEnv::_master_info
- const TMasterInfo& _master_info;
+ // Not owner. Reference to the ExecEnv::_cluster_info
+ const ClusterInfo* _cluster_info;
};
class AgentUtils {
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 026610d5b0c..fefdb65e44b 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -837,7 +837,7 @@ static void send_stats_to_fe_async(const int64_t db_id,
const int64_t txn_id,
Status status;
int64_t duration_ns = 0;
TNetworkAddress master_addr =
- ExecEnv::GetInstance()->master_info()->network_address;
+ ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
if (master_addr.hostname.empty() || master_addr.port == 0) {
status = Status::Error<SERVICE_UNAVAILABLE>(
"Have not get FE Master heartbeat yet");
diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index 46df44a40f2..92c5974e4a7 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -31,13 +31,14 @@ struct AuthInfo {
std::string cluster;
std::string user_ip;
// -1 as unset
- int64_t auth_code = -1;
+ int64_t auth_code = -1; // deprecated
std::string token;
};
template <class T>
void set_request_auth(T* req, const AuthInfo& auth) {
req->user = auth.user; // always set user, because it may be used by FE
+ // auth code is deprecated and should be removed in 3.1
if (auth.auth_code != -1) {
// if auth_code is set, no need to set other info
req->__set_auth_code(auth.auth_code);
diff --git a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
index 6f66d9a8fd5..98051638026 100644
--- a/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_active_queries_scanner.cpp
@@ -51,7 +51,7 @@ Status SchemaActiveQueriesScanner::start(RuntimeState* state)
{
}
Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TSchemaTableRequestParams schema_table_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
diff --git a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
index ebe2bd3b70e..459715fd628 100644
--- a/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_partitions_scanner.cpp
@@ -98,7 +98,7 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) {
}
Status SchemaPartitionsScanner::get_onedb_info_from_fe(int64_t dbId) {
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TSchemaTableRequestParams schema_table_request_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp
b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
index e8d95f0abd6..8660d75e8a1 100644
--- a/be/src/exec/schema_scanner/schema_routine_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp
@@ -62,7 +62,7 @@ Status SchemaRoutinesScanner::start(RuntimeState* state) {
}
Status SchemaRoutinesScanner::get_block_from_fe() {
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TSchemaTableRequestParams schema_table_request_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
schema_table_request_params.__isset.columns_name = true;
@@ -138,4 +138,4 @@ Status
SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block,
return Status::OK();
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_table_options_scanner.cpp
b/be/src/exec/schema_scanner/schema_table_options_scanner.cpp
index f4b636be68f..bb778996a83 100644
--- a/be/src/exec/schema_scanner/schema_table_options_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_table_options_scanner.cpp
@@ -70,7 +70,7 @@ Status SchemaTableOptionsScanner::start(RuntimeState* state) {
}
Status SchemaTableOptionsScanner::get_onedb_info_from_fe(int64_t dbId) {
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TSchemaTableRequestParams schema_table_request_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
diff --git a/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp
b/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp
index 749113da1b5..8d6a26a552f 100644
--- a/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_table_properties_scanner.cpp
@@ -67,7 +67,7 @@ Status SchemaTablePropertiesScanner::start(RuntimeState*
state) {
}
Status SchemaTablePropertiesScanner::get_onedb_info_from_fe(int64_t dbId) {
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TSchemaTableRequestParams schema_table_request_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
diff --git a/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp
b/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp
index a1d4568d905..a91a28322ec 100644
--- a/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_group_privileges.cpp
@@ -45,7 +45,7 @@ Status
SchemaWorkloadGroupPrivilegesScanner::start(RuntimeState* state) {
}
Status
SchemaWorkloadGroupPrivilegesScanner::_get_workload_group_privs_block_from_fe()
{
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TSchemaTableRequestParams schema_table_request_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
@@ -134,4 +134,4 @@ Status
SchemaWorkloadGroupPrivilegesScanner::get_next_block_internal(vectorized:
return Status::OK();
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
index dd81a3ecb26..43562a8f52c 100644
--- a/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_groups_scanner.cpp
@@ -58,7 +58,7 @@ Status SchemaWorkloadGroupsScanner::start(RuntimeState*
state) {
}
Status SchemaWorkloadGroupsScanner::_get_workload_groups_block_from_fe() {
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TSchemaTableRequestParams schema_table_request_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
@@ -144,4 +144,4 @@ Status
SchemaWorkloadGroupsScanner::get_next_block_internal(vectorized::Block* b
return Status::OK();
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git
a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
index 2d91f151f5f..5c6a6f70a88 100644
--- a/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_workload_sched_policy_scanner.cpp
@@ -49,7 +49,7 @@ Status
SchemaWorkloadSchedulePolicyScanner::start(RuntimeState* state) {
}
Status
SchemaWorkloadSchedulePolicyScanner::_get_workload_schedule_policy_block_from_fe()
{
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TSchemaTableRequestParams schema_table_request_params;
for (int i = 0; i < _s_tbls_columns.size(); i++) {
@@ -135,4 +135,4 @@ Status
SchemaWorkloadSchedulePolicyScanner::get_next_block_internal(vectorized::
return Status::OK();
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/http/action/compaction_score_action.cpp
b/be/src/http/action/compaction_score_action.cpp
index 10b8cc6bdba..eeb0d0a642c 100644
--- a/be/src/http/action/compaction_score_action.cpp
+++ b/be/src/http/action/compaction_score_action.cpp
@@ -166,7 +166,7 @@ CompactionScoreAction::CompactionScoreAction(ExecEnv*
exec_env, TPrivilegeHier::
_accessor(std::make_unique<CloudCompactionScoresAccessor>(tablet_mgr)) {}
void CompactionScoreAction::handle(HttpRequest* req) {
- req->add_output_header(HttpHeaders::CONTENT_TYPE,
HttpHeaders::JsonType.data());
+ req->add_output_header(HttpHeaders::CONTENT_TYPE,
HttpHeaders::JSON_TYPE.data());
auto top_n_param = req->param(TOP_N);
size_t top_n = DEFAULT_TOP_N;
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index 4a34605aa33..99205368616 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -325,14 +325,14 @@ Status HttpStreamAction::process_put(HttpRequest*
http_req,
request.__set_group_commit_mode("sync_mode");
}
}
- if (_exec_env->master_info()->__isset.backend_id) {
- request.__set_backend_id(_exec_env->master_info()->backend_id);
+ if (_exec_env->cluster_info()->backend_id != 0) {
+ request.__set_backend_id(_exec_env->cluster_info()->backend_id);
} else {
- LOG(WARNING) << "_exec_env->master_info not set backend_id";
+ LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
}
// plan this load
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index fa23e5e56c1..e3ad7f44866 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -666,7 +666,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
#ifndef BE_TEST
// plan this load
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index bf1cd751ae3..c842a4fe2dd 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -26,6 +26,7 @@
#include "common/config.h"
#include "http/http_headers.h"
#include "http/http_status.h"
+#include "runtime/exec_env.h"
#include "util/stack_util.h"
namespace doris {
@@ -141,6 +142,9 @@ Status HttpClient::init(const std::string& url, bool
set_fail_on_error) {
return Status::InternalError("fail to set CURLOPT_URL");
}
+#ifndef BE_TEST
+ set_auth_token(ExecEnv::GetInstance()->cluster_info()->curr_auth_token);
+#endif
return Status::OK();
}
diff --git a/be/src/http/http_client.h b/be/src/http/http_client.h
index f6a1a17ec29..fb692c50268 100644
--- a/be/src/http/http_client.h
+++ b/be/src/http/http_client.h
@@ -26,6 +26,7 @@
#include <string>
#include "common/status.h"
+#include "http/http_headers.h"
#include "http/http_method.h"
namespace doris {
@@ -55,6 +56,13 @@ public:
curl_easy_setopt(_curl, CURLOPT_PASSWORD, passwd.c_str());
}
+ // Auth-Token: xxxx
+ void set_auth_token(const std::string& token) {
+ std::string scratch_str = HttpHeaders::AUTH_TOKEN + ": " + token;
+ _header_list = curl_slist_append(_header_list, scratch_str.c_str());
+ curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _header_list);
+ }
+
// content_type such as "application/json"
void set_content_type(const std::string content_type) {
std::string scratch_str = "Content-Type: " + content_type;
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index ec2dfc896e4..b9b5f0d85ae 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -66,7 +66,7 @@ static const std::string HTTP_TXN_OPERATION_KEY =
"txn_operation";
static const std::string HTTP_MEMTABLE_ON_SINKNODE = "memtable_on_sink_node";
static const std::string HTTP_LOAD_STREAM_PER_NODE = "load_stream_per_node";
static const std::string HTTP_WAL_ID_KY = "wal_id";
-static const std::string HTTP_AUTH_CODE = "auth_code";
+static const std::string HTTP_AUTH_CODE = "auth_code"; // deprecated
static const std::string HTTP_GROUP_COMMIT = "group_commit";
static const std::string HTTP_CLOUD_CLUSTER = "cloud_cluster";
diff --git a/be/src/http/http_handler_with_auth.cpp
b/be/src/http/http_handler_with_auth.cpp
index 166638ab318..518b9868de1 100644
--- a/be/src/http/http_handler_with_auth.cpp
+++ b/be/src/http/http_handler_with_auth.cpp
@@ -52,6 +52,23 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {
return -1;
}
+ // check auth by token
+ if (auth_info.token != "") {
+#ifdef BE_TEST
+ if (auth_info.token == "valid_token") {
+ return 0;
+#else
+ if (_exec_env->check_auth_token(auth_info.token)) {
+ return 0;
+#endif
+ } else {
+ LOG(WARNING) << "invalid auth token, request: " <<
req->debug_string();
+ HttpChannel::send_error(req, HttpStatus::BAD_REQUEST);
+ return -1;
+ }
+ }
+
+ // check auth by user/password
auth_request.user = auth_info.user;
auth_request.passwd = auth_info.passwd;
auth_request.__set_cluster(auth_info.cluster);
@@ -65,7 +82,7 @@ int HttpHandlerWithAuth::on_header(HttpRequest* req) {
}
#ifndef BE_TEST
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
{
auto status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
diff --git a/be/src/http/http_headers.cpp b/be/src/http/http_headers.cpp
index 79d9240c163..4fc5338145c 100644
--- a/be/src/http/http_headers.cpp
+++ b/be/src/http/http_headers.cpp
@@ -93,6 +93,7 @@ const char* HttpHeaders::WEBSOCKET_ORIGIN =
"WebSocket-Origin";
const char* HttpHeaders::WEBSOCKET_PROTOCOL = "WebSocket-Protocol";
const char* HttpHeaders::WWW_AUTHENTICATE = "WWW-Authenticate";
-const std::string HttpHeaders::JsonType = "application/json";
+const std::string HttpHeaders::JSON_TYPE = "application/json";
+const std::string HttpHeaders::AUTH_TOKEN = "Auth-Token";
} // namespace doris
diff --git a/be/src/http/http_headers.h b/be/src/http/http_headers.h
index e2f9547ac9d..0fdfc1be22c 100644
--- a/be/src/http/http_headers.h
+++ b/be/src/http/http_headers.h
@@ -97,7 +97,8 @@ public:
static const char* WEBSOCKET_PROTOCOL;
static const char* WWW_AUTHENTICATE;
- static const std::string JsonType;
+ static const std::string JSON_TYPE;
+ static const std::string AUTH_TOKEN;
};
} // namespace doris
diff --git a/be/src/http/utils.cpp b/be/src/http/utils.cpp
index fbbc1cd93bf..f91610476b4 100644
--- a/be/src/http/utils.cpp
+++ b/be/src/http/utils.cpp
@@ -54,6 +54,8 @@ std::string encode_basic_auth(const std::string& user, const
std::string& passwd
}
bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string*
passwd) {
+ // const auto& token = req.header(HttpHeaders::AUTH_TOKEN);
+
const char k_basic[] = "Basic ";
const auto& auth = req.header(HttpHeaders::AUTHORIZATION);
if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) !=
0) {
@@ -75,8 +77,11 @@ bool parse_basic_auth(const HttpRequest& req, std::string*
user, std::string* pa
}
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth) {
+ // deprecated, removed in 3.1, use AUTH_TOKEN
const auto& token = req.header("token");
+ // deprecated, removed in 3.1, use AUTH_TOKEN
const auto& auth_code = req.header(HTTP_AUTH_CODE);
+ const auto& auth_token = req.header(HttpHeaders::AUTH_TOKEN);
std::tuple<std::string, std::string, std::string> tmp;
auto& [user, pass, cluster] = tmp;
@@ -93,9 +98,11 @@ bool parse_basic_auth(const HttpRequest& req, AuthInfo*
auth) {
}
if (!token.empty()) {
- auth->token = token;
+ auth->token = token; // deprecated
+ } else if (!auth_token.empty()) {
+ auth->token = auth_token;
} else if (!auth_code.empty()) {
- auth->auth_code = std::stoll(auth_code);
+ auth->auth_code = std::stoll(auth_code); // deprecated
} else if (!valid_basic_auth) {
return false;
}
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index 70ad64a81ce..b3af2531f15 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -197,7 +197,7 @@ Status MultiTablePipe::request_and_exec_plans() {
// plan this load
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
- TNetworkAddress master_addr = exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = exec_env->cluster_info()->master_fe_addr;
int64_t stream_load_put_start_time = MonotonicNanos();
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 020d151d16b..96dc9295834 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -723,13 +723,13 @@ void StorageEngine::_update_replica_infos_callback() {
!t->tablet_meta()->tablet_schema()->disable_auto_compaction() &&
t->tablet_meta()->tablet_schema()->enable_single_replica_compaction();
});
- TMasterInfo* master_info = ExecEnv::GetInstance()->master_info();
- if (master_info == nullptr) {
+ ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
+ if (cluster_info == nullptr) {
LOG(WARNING) << "Have not get FE Master heartbeat yet";
std::this_thread::sleep_for(std::chrono::seconds(2));
continue;
}
- TNetworkAddress master_addr = master_info->network_address;
+ TNetworkAddress master_addr = cluster_info->master_fe_addr;
if (master_addr.hostname == "" || master_addr.port == 0) {
LOG(WARNING) << "Have not get FE Master heartbeat yet";
std::this_thread::sleep_for(std::chrono::seconds(2));
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 1fc5b7278c6..fc3a69fd5cd 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -143,13 +143,13 @@ Result<std::string> check_dest_binlog_valid(const
std::string& tablet_dir,
} while (false)
EngineCloneTask::EngineCloneTask(StorageEngine& engine, const TCloneReq&
clone_req,
- const TMasterInfo& master_info, int64_t
signature,
+ const ClusterInfo* cluster_info, int64_t
signature,
std::vector<TTabletInfo>* tablet_infos)
: _engine(engine),
_clone_req(clone_req),
_tablet_infos(tablet_infos),
_signature(signature),
- _master_info(master_info) {
+ _cluster_info(cluster_info) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
"EngineCloneTask#tabletId=" +
std::to_string(_clone_req.tablet_id));
@@ -356,7 +356,7 @@ Status
EngineCloneTask::_make_and_download_snapshots(DataDir& data_dir,
bool*
allow_incremental_clone) {
Status status;
- const auto& token = _master_info.token;
+ const auto& token = _cluster_info->token;
int timeout_s = 0;
if (_clone_req.__isset.timeout_s) {
diff --git a/be/src/olap/task/engine_clone_task.h
b/be/src/olap/task/engine_clone_task.h
index 3161b803c82..9290ed9552e 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -31,11 +31,11 @@
namespace doris {
class DataDir;
class TCloneReq;
-class TMasterInfo;
class TTabletInfo;
class Tablet;
struct Version;
class StorageEngine;
+class ClusterInfo;
const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?";
const std::string HTTP_REQUEST_TOKEN_PARAM = "token=";
@@ -51,7 +51,7 @@ public:
Status execute() override;
EngineCloneTask(StorageEngine& engine, const TCloneReq& clone_req,
- const TMasterInfo& master_info, int64_t signature,
+ const ClusterInfo* cluster_info, int64_t signature,
std::vector<TTabletInfo>* tablet_infos);
~EngineCloneTask() override = default;
@@ -93,7 +93,7 @@ private:
const TCloneReq& _clone_req;
std::vector<TTabletInfo>* _tablet_infos = nullptr;
int64_t _signature;
- const TMasterInfo& _master_info;
+ const ClusterInfo* _cluster_info;
int64_t _copy_size;
int64_t _copy_time_ms;
std::vector<PendingRowsetGuard> _pending_rs_guards;
diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index a7e33e7383f..03343603ae8 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -213,7 +213,7 @@ Status WalManager::create_wal_path(int64_t db_id, int64_t
table_id, int64_t wal_
base_path = _wal_dirs_info->get_available_random_wal_dir();
std::stringstream ss;
ss << base_path << "/" << std::to_string(db_id) << "/" <<
std::to_string(table_id) << "/"
- << std::to_string(wal_version) << "_" <<
_exec_env->master_info()->backend_id << "_"
+ << std::to_string(wal_version) << "_" <<
_exec_env->cluster_info()->backend_id << "_"
<< std::to_string(wal_id) << "_" << label;
{
std::lock_guard<std::shared_mutex> wrlock(_wal_path_lock);
@@ -377,8 +377,8 @@ Status WalManager::_replay_background() {
break;
}
// port == 0 means not received heartbeat yet
- if (_exec_env->master_info() != nullptr &&
- _exec_env->master_info()->network_address.port == 0) {
+ if (_exec_env->cluster_info() != nullptr &&
+ _exec_env->cluster_info()->master_fe_addr.port == 0) {
continue;
}
// replay residual wal,only replay once
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index 69453054d18..84cf7afd4d3 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -161,12 +161,15 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo>
wal_info) {
Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
TLoadTxnRollbackRequest request;
- request.__set_auth_code(0); // this is a fake, fe not check it now
+ // this is a fake, fe not check it now
+ // should be removed in 3.1, use token instead
+ request.__set_auth_code(0);
+ request.__set_token(_exec_env->cluster_info()->curr_auth_token);
request.__set_db_id(db_id);
request.__set_label(label);
request.__set_reason("relay wal with label " + label);
TLoadTxnRollbackResult result;
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
@@ -235,7 +238,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const
std::string& wal,
ctx->wal_id = wal_id;
ctx->label = label;
ctx->need_commit_self = false;
- ctx->auth.token = "relay_wal"; // this is a fake, fe not check it now
+ ctx->auth.token = _exec_env->cluster_info()->curr_auth_token;
ctx->auth.user = "admin";
ctx->group_commit = false;
ctx->load_type = TLoadType::MANUL_LOAD;
@@ -245,6 +248,7 @@ Status WalTable::_handle_stream_load(int64_t wal_id, const
std::string& wal,
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
if (ctx->status.ok()) {
+ // deprecated and should be removed in 3.1, use token instead.
ctx->auth.auth_code = wal_id;
st = _exec_env->stream_load_executor()->commit_txn(ctx.get());
} else {
@@ -293,7 +297,7 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t
tb_id,
request.__set_table_id(tb_id);
TGetColumnInfoResult result;
Status status;
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
if (master_addr.hostname.empty() || master_addr.port == 0) {
status = Status::InternalError<false>("Have not get FE Master
heartbeat yet");
} else {
@@ -327,4 +331,4 @@ Status WalTable::_read_wal_header(const std::string&
wal_path, std::string& colu
return Status::OK();
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/cluster_info.h b/be/src/runtime/cluster_info.h
new file mode 100644
index 00000000000..da66b4f0acf
--- /dev/null
+++ b/be/src/runtime/cluster_info.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <gen_cpp/Types_types.h>
+
+#include <string>
+
+namespace doris {
+
+// This class is used to save the cluster info
+// like cluster id, epoch, cloud_unique_id, etc.
+// These info are usually in heartbeat from Master FE.
+class ClusterInfo {
+public:
+ // Unique cluster id
+ int32_t cluster_id = 0;
+ // Master FE addr: ip:rpc_port
+ TNetworkAddress master_fe_addr;
+ // Master FE http_port
+ int32_t master_fe_http_port = 0;
+ // Unique cluster token
+ std::string token = "";
+ // Backend ID
+ int64_t backend_id = 0;
+
+ // Auth token for internal authentication
+ // Save the last 2 tokens to avoid token invalid during token update
+ std::string curr_auth_token = "";
+ std::string last_auth_token = "";
+};
+
+} // namespace doris
diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp
index 872069ee70a..ab24d7ca192 100644
--- a/be/src/runtime/exec_env.cpp
+++ b/be/src/runtime/exec_env.cpp
@@ -65,12 +65,16 @@ Result<BaseTabletSPtr> ExecEnv::get_tablet(int64_t
tablet_id) {
}
const std::string& ExecEnv::token() const {
- return _master_info->token;
+ return _cluster_info->token;
}
-std::map<TNetworkAddress, FrontendInfo> ExecEnv::get_frontends() {
+std::vector<TFrontendInfo> ExecEnv::get_frontends() {
std::lock_guard<std::mutex> lg(_frontends_lock);
- return _frontends;
+ std::vector<TFrontendInfo> infos;
+ for (const auto& cur_fe : _frontends) {
+ infos.push_back(cur_fe.second.info);
+ }
+ return infos;
}
void ExecEnv::update_frontends(const std::vector<TFrontendInfo>& new_fe_infos)
{
@@ -173,4 +177,9 @@ void ExecEnv::wait_for_all_tasks_done() {
}
}
+bool ExecEnv::check_auth_token(const std::string& auth_token) {
+ return _cluster_info->curr_auth_token == auth_token ||
+ _cluster_info->last_auth_token == auth_token;
+}
+
} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 307c2586fc6..4a0000fa19f 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -33,6 +33,7 @@
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/tablet_fwd.h"
#include "pipeline/pipeline_tracing.h"
+#include "runtime/cluster_info.h"
#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove
this include header
#include "util/threadpool.h"
@@ -83,7 +84,6 @@ class BaseStorageEngine;
class ResultBufferMgr;
class ResultQueueMgr;
class RuntimeQueryStatisticsMgr;
-class TMasterInfo;
class LoadChannelMgr;
class LoadStreamMgr;
class LoadStreamMapPool;
@@ -219,7 +219,7 @@ public:
UserFunctionCache* user_function_cache() { return _user_function_cache; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
- TMasterInfo* master_info() { return _master_info; }
+ ClusterInfo* cluster_info() { return _cluster_info; }
LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
BfdParser* bfd_parser() const { return _bfd_parser; }
BrokerMgr* broker_mgr() const { return _broker_mgr; }
@@ -262,7 +262,7 @@ public:
void set_memtable_memory_limiter(MemTableMemoryLimiter* limiter) {
_memtable_memory_limiter.reset(limiter);
}
- void set_master_info(TMasterInfo* master_info) { this->_master_info =
master_info; }
+ void set_cluster_info(ClusterInfo* cluster_info) { this->_cluster_info =
cluster_info; }
void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr>
new_load_stream_mgr) {
this->_new_load_stream_mgr = new_load_stream_mgr;
}
@@ -299,7 +299,7 @@ public:
void wait_for_all_tasks_done();
void update_frontends(const std::vector<TFrontendInfo>& new_infos);
- std::map<TNetworkAddress, FrontendInfo> get_frontends();
+ std::vector<TFrontendInfo> get_frontends();
std::map<TNetworkAddress, FrontendInfo> get_running_frontends();
TabletSchemaCache* get_tablet_schema_cache() { return
_tablet_schema_cache; }
@@ -333,6 +333,8 @@ public:
orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; }
arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; }
+ bool check_auth_token(const std::string& auth_token);
+
private:
ExecEnv();
@@ -403,7 +405,7 @@ private:
WorkloadGroupMgr* _workload_group_manager = nullptr;
ResultCache* _result_cache = nullptr;
- TMasterInfo* _master_info = nullptr;
+ ClusterInfo* _cluster_info = nullptr;
LoadPathMgr* _load_path_mgr = nullptr;
BfdParser* _bfd_parser = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 062069044dc..9d761786611 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -278,7 +278,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
config::query_cache_elasticity_size_mb);
- _master_info = new TMasterInfo();
+ _cluster_info = new ClusterInfo();
_load_path_mgr = new LoadPathMgr(this);
_bfd_parser = BfdParser::create();
_broker_mgr = new BrokerMgr(this);
@@ -759,9 +759,9 @@ void ExecEnv::destroy() {
// Master Info is a thrift object, it could be the last one to deconstruct.
// Master info should be deconstruct later than fragment manager, because
fragment will
- // access master_info.backend id to access some info. If there is a
running query and master
+ // access cluster_info.backend_id to access some info. If there is a
running query and master
// info is deconstructed then BE process will core at coordinator back
method in fragment mgr.
- SAFE_DELETE(_master_info);
+ SAFE_DELETE(_cluster_info);
// NOTE: runtime query statistics mgr could be visited by query and daemon
thread
// so it should be created before all query begin and deleted after all
query and daemon thread stoppped
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e683b84e2b4..86a3a8e773d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -526,8 +526,8 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
req.runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (!params.error_log.empty());
- if (_exec_env->master_info()->__isset.backend_id) {
- params.__set_backend_id(_exec_env->master_info()->backend_id);
+ if (_exec_env->cluster_info()->backend_id != 0) {
+ params.__set_backend_id(_exec_env->cluster_info()->backend_id);
}
TReportExecStatusResult res;
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index cd54718bc5f..f06f26b6418 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -351,12 +351,12 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
request.__set_strictMode(false);
// this is an internal interface, use admin to pass the auth check
request.__set_user("admin");
- if (_exec_env->master_info()->__isset.backend_id) {
- request.__set_backend_id(_exec_env->master_info()->backend_id);
+ if (_exec_env->cluster_info()->backend_id != 0) {
+ request.__set_backend_id(_exec_env->cluster_info()->backend_id);
} else {
- LOG(WARNING) << "_exec_env->master_info not set backend_id";
+ LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
}
- TNetworkAddress master_addr =
_exec_env->master_info()->network_address;
+ TNetworkAddress master_addr =
_exec_env->cluster_info()->master_fe_addr;
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&result, &request](FrontendServiceConnection& client) {
@@ -440,23 +440,25 @@ Status
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
{ status = Status::InternalError(""); });
// commit txn
TLoadTxnCommitRequest request;
- request.__set_auth_code(0); // this is a fake, fe not check it now
+ // deprecated and should be removed in 3.1, use token instead
+ request.__set_auth_code(0);
+ request.__set_token(_exec_env->cluster_info()->curr_auth_token);
request.__set_db_id(db_id);
request.__set_table_id(table_id);
request.__set_txnId(txn_id);
request.__set_thrift_rpc_timeout_ms(config::txn_commit_rpc_timeout_ms);
request.__set_groupCommit(true);
request.__set_receiveBytes(state->num_bytes_load_total());
- if (_exec_env->master_info()->__isset.backend_id) {
- request.__set_backendId(_exec_env->master_info()->backend_id);
+ if (_exec_env->cluster_info()->backend_id != 0) {
+ request.__set_backendId(_exec_env->cluster_info()->backend_id);
} else {
- LOG(WARNING) << "_exec_env->master_info not set backend_id";
+ LOG(WARNING) << "_exec_env->cluster_info not set backend_id";
}
if (state) {
request.__set_commitInfos(state->tablet_commit_infos());
}
TLoadTxnCommitResult result;
- TNetworkAddress master_addr =
_exec_env->master_info()->network_address;
+ TNetworkAddress master_addr =
_exec_env->cluster_info()->master_fe_addr;
int retry_times = 0;
while (retry_times < config::mow_stream_load_commit_retry_times) {
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
@@ -482,12 +484,14 @@ Status
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
} else {
// abort txn
TLoadTxnRollbackRequest request;
- request.__set_auth_code(0); // this is a fake, fe not check it now
+ // deprecated and should be removed in 3.1, use token instead
+ request.__set_auth_code(0);
+ request.__set_token(_exec_env->cluster_info()->curr_auth_token);
request.__set_db_id(db_id);
request.__set_txnId(txn_id);
request.__set_reason(status.to_string());
TLoadTxnRollbackResult result;
- TNetworkAddress master_addr =
_exec_env->master_info()->network_address;
+ TNetworkAddress master_addr =
_exec_env->cluster_info()->master_fe_addr;
st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 2c69b8a5870..06150ae3d20 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -230,7 +230,9 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
ctx->db = task.db;
ctx->table = task.tbl;
ctx->label = task.label;
+ // deprecated, removed in 3.1, use auth token instead.
ctx->auth.auth_code = task.auth_code;
+ ctx->auth.token = _exec_env->cluster_info()->curr_auth_token;
if (task.__isset.max_interval_s) {
ctx->max_interval_s = task.max_interval_s;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 77fd80cd528..f07b308850e 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -162,7 +162,7 @@ TReportExecStatusParams
RuntimeQueryStatisticsMgr::create_report_exec_status_par
TReportExecStatusParams req;
THRIFT_MOVE_VALUES(req, query_profile, profile);
- req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+ req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id);
// invalid query id to avoid API compatibility during upgrade
req.__set_query_id(TUniqueId());
req.__set_done(is_done);
@@ -341,7 +341,7 @@ void
RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id,
}
void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
- int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+ int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
// 1 get query statistics map
std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>>
fe_qs_map;
std::map<std::string, std::pair<bool, bool>> qs_status; // <finished,
timeout>
@@ -515,7 +515,7 @@ void
RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id, int6
void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block*
block) {
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
- int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+ int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
// block's schema come from
SchemaBackendActiveTasksScanner::_s_tbls_columns
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
diff --git a/be/src/runtime/small_file_mgr.cpp
b/be/src/runtime/small_file_mgr.cpp
index 19a05f9c9dc..15d2c937be2 100644
--- a/be/src/runtime/small_file_mgr.cpp
+++ b/be/src/runtime/small_file_mgr.cpp
@@ -169,10 +169,10 @@ Status SmallFileMgr::_download_file(int64_t file_id,
const std::string& md5,
HttpClient client;
std::stringstream url_ss;
- TMasterInfo* master_info = _exec_env->master_info();
- url_ss << master_info->network_address.hostname << ":" <<
master_info->http_port
+ ClusterInfo* cluster_info = _exec_env->cluster_info();
+ url_ss << cluster_info->master_fe_addr.hostname << ":" <<
cluster_info->master_fe_http_port
<< "/api/get_small_file?"
- << "file_id=" << file_id << "&token=" << master_info->token;
+ << "file_id=" << file_id << "&token=" << cluster_info->token;
std::string url = url_ss.str();
diff --git a/be/src/runtime/snapshot_loader.cpp
b/be/src/runtime/snapshot_loader.cpp
index d04a5463879..784904c78a3 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -942,7 +942,7 @@ Status SnapshotLoader::_report_every(int report_threshold,
int* counter, int32_t
LOG(INFO) << "report to frontend. job id: " << _job_id << ", task id: " <<
_task_id
<< ", finished num: " << finished_num << ", total num:" <<
total_num;
- TNetworkAddress master_addr = _env->master_info()->network_address;
+ TNetworkAddress master_addr = _env->cluster_info()->master_fe_addr;
TSnapshotLoaderReportRequest request;
request.job_id = _job_id;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index ec83141893a..482fadac44e 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -173,12 +173,12 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext*
ctx) {
request.__set_timeout(ctx->timeout_second);
}
request.__set_request_id(ctx->id.to_thrift());
- request.__set_backend_id(_exec_env->master_info()->backend_id);
+ request.__set_backend_id(_exec_env->cluster_info()->backend_id);
TLoadTxnBeginResult result;
Status status;
int64_t duration_ns = 0;
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
if (master_addr.hostname.empty() || master_addr.port == 0) {
status = Status::Error<SERVICE_UNAVAILABLE>("Have not get FE Master
heartbeat yet");
} else {
@@ -215,7 +215,7 @@ Status
StreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
TLoadTxnCommitRequest request;
get_commit_request(ctx, request);
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnCommitResult result;
int64_t duration_ns = 0;
{
@@ -260,7 +260,7 @@ Status
StreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
request.__set_txnId(ctx->txn_id);
}
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxn2PCResult result;
int64_t duration_ns = 0;
{
@@ -312,7 +312,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext*
ctx) {
TLoadTxnCommitRequest request;
get_commit_request(ctx, request);
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnCommitResult result;
#ifndef BE_TEST
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
@@ -344,7 +344,7 @@ Status StreamLoadExecutor::commit_txn(StreamLoadContext*
ctx) {
void StreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
DorisMetrics::instance()->stream_load_txn_rollback_request_total->increment(1);
- TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+ TNetworkAddress master_addr = _exec_env->cluster_info()->master_fe_addr;
TLoadTxnRollbackRequest request;
set_request_auth(&request, ctx->auth);
request.__set_db(ctx->db);
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 003f07f1db0..927d4d13814 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -254,7 +254,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
}
void WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
- int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
+ int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id;
int cpu_num = CpuInfo::num_cores();
cpu_num = cpu_num <= 0 ? 1 : cpu_num;
uint64_t total_cpu_time_ns_per_second = cpu_num * 1000000000ll;
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index e6fdfaa8765..abdef513296 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -599,7 +599,7 @@ void _ingest_binlog(StorageEngine& engine, IngestBinlogArg*
arg) {
} // namespace
BaseBackendService::BaseBackendService(ExecEnv* exec_env)
- : _exec_env(exec_env), _agent_server(new AgentServer(exec_env,
*exec_env->master_info())) {}
+ : _exec_env(exec_env), _agent_server(new AgentServer(exec_env,
exec_env->cluster_info())) {}
BaseBackendService::~BaseBackendService() = default;
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index dcc76259868..00935e8fc64 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -83,9 +83,7 @@
#include "util/thrift_server.h"
#include "util/uid_util.h"
-namespace doris {
-class TMasterInfo;
-} // namespace doris
+namespace doris {} // namespace doris
static void help(const char*);
@@ -579,11 +577,11 @@ int main(int argc, char** argv) {
stop_work_if_error(status, "Doris Be http service did not start correctly,
exiting");
// 4. heart beat server
- doris::TMasterInfo* master_info = exec_env->master_info();
+ doris::ClusterInfo* cluster_info = exec_env->cluster_info();
std::unique_ptr<doris::ThriftServer> heartbeat_thrift_server;
doris::Status heartbeat_status = doris::create_heartbeat_server(
exec_env, doris::config::heartbeat_service_port,
&heartbeat_thrift_server,
- doris::config::heartbeat_service_thread_count, master_info);
+ doris::config::heartbeat_service_thread_count, cluster_info);
stop_work_if_error(heartbeat_status, "Heartbeat services did not start
correctly, exiting");
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 89b43ec5223..ae84081813f 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -500,7 +500,7 @@ Status PInternalService::_exec_plan_fragment_impl(
const std::function<void(RuntimeState*, Status*)>& cb) {
// Sometimes the BE do not receive the first heartbeat message and it
receives request from FE
// If BE execute this fragment, it will core when it wants to get some
property from master info.
- if (ExecEnv::GetInstance()->master_info() == nullptr) {
+ if (ExecEnv::GetInstance()->cluster_info() == nullptr) {
return Status::InternalError(
"Have not receive the first heartbeat message from master, not
ready to provide "
"service");
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index 180d18b0c2c..289930b16bc 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -278,7 +278,7 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange&
meta_scan_range) {
// _state->execution_timeout() is seconds, change to milliseconds
int time_out = _state->execution_timeout() * 1000;
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
TFetchSchemaTableDataResult result;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
diff --git a/be/src/vec/sink/autoinc_buffer.cpp
b/be/src/vec/sink/autoinc_buffer.cpp
index 4bc87dff489..80ce9d494d5 100644
--- a/be/src/vec/sink/autoinc_buffer.cpp
+++ b/be/src/vec/sink/autoinc_buffer.cpp
@@ -46,7 +46,7 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t
batch_size) {
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
_rpc_status = Status::OK();
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
for (uint32_t retry_times = 0; retry_times <
FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
TAutoIncrementRangeRequest request;
TAutoIncrementRangeResult result;
@@ -167,4 +167,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t
length) {
return Status::OK();
}
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vrow_distribution.cpp
b/be/src/vec/sink/vrow_distribution.cpp
index 74a2830a191..2de21edd80b 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -102,7 +102,7 @@ Status VRowDistribution::automatic_create_partition() {
request.__set_be_endpoint(be_endpoint);
VLOG_NOTICE << "automatic partition rpc begin request " << request;
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
int time_out = _state->execution_timeout() * 1000;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
@@ -175,7 +175,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
request.__set_be_endpoint(be_endpoint);
VLOG_NOTICE << "auto detect replace partition request: " << request;
- TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
int time_out = _state->execution_timeout() * 1000;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
diff --git a/be/test/agent/heartbeat_server_test.cpp
b/be/test/agent/heartbeat_server_test.cpp
index db3a7a2eb78..f0c93a8caca 100644
--- a/be/test/agent/heartbeat_server_test.cpp
+++ b/be/test/agent/heartbeat_server_test.cpp
@@ -35,12 +35,12 @@ namespace doris {
TEST(HeartbeatTest, TestHeartbeat) {
setenv("DORIS_HOME", "./", 1);
THeartbeatResult heartbeat_result;
- TMasterInfo ori_master_info;
- ori_master_info.cluster_id = -1;
- ori_master_info.epoch = 0;
- ori_master_info.network_address.hostname = "";
- ori_master_info.network_address.port = 0;
- HeartbeatServer heartbeat_server(&ori_master_info);
+ ClusterInfo ori_cluster_info;
+ ori_cluster_info.cluster_id = -1;
+ ori_cluster_info.epoch = 0;
+ ori_cluster_info.network_address.hostname = "";
+ ori_cluster_info.network_address.port = 0;
+ HeartbeatServer heartbeat_server(&ori_cluster_info);
heartbeat_server.heartbeat(heartbeat_result, master_info);
EXPECT_EQ(TStatusCode::OK, heartbeat_result.status.status_code);
diff --git a/be/test/agent/mock_utils.h b/be/test/agent/mock_utils.h
index 3a14ca79626..b49ff3e372a 100644
--- a/be/test/agent/mock_utils.h
+++ b/be/test/agent/mock_utils.h
@@ -33,7 +33,7 @@ public:
class MockMasterServerClient : public MasterServerClient {
public:
- MockMasterServerClient(const TMasterInfo& master_info,
+ MockMasterServerClient(const ClusterInfo* cluster_info,
FrontendServiceClientCache* client_cache);
MOCK_METHOD2(finish_task, Status(const TFinishTaskRequest request,
TMasterResult* result));
MOCK_METHOD2(report, Status(const TReportRequest request, TMasterResult*
result));
diff --git a/be/test/agent/task_worker_pool_test.cpp
b/be/test/agent/task_worker_pool_test.cpp
index 0c7fd88396b..7be29e6feb4 100644
--- a/be/test/agent/task_worker_pool_test.cpp
+++ b/be/test/agent/task_worker_pool_test.cpp
@@ -27,6 +27,7 @@
#include "olap/options.h"
#include "olap/storage_engine.h"
+#include "runtime/cluster_info.h"
namespace doris {
@@ -106,14 +107,14 @@ TEST(TaskWorkerPoolTest, ReportWorkerPool) {
ExecEnv::GetInstance()->set_storage_engine(std::make_unique<StorageEngine>(EngineOptions
{}));
Defer defer {[] { ExecEnv::GetInstance()->set_storage_engine(nullptr); }};
- TMasterInfo master_info;
+ ClusterInfo cluster_info;
std::atomic_int count {0};
- ReportWorker worker("test", master_info, 1, [&] { ++count; });
+ ReportWorker worker("test", &cluster_info, 1, [&] { ++count; });
worker.notify(); // Not received heartbeat yet, ignore
std::this_thread::sleep_for(100ms);
- master_info.network_address.__set_port(9030);
+ cluster_info.master_fe_addr.__set_port(9030);
worker.notify();
std::this_thread::sleep_for(100ms);
EXPECT_EQ(count.load(), 1);
diff --git a/be/test/http/http_client_test.cpp
b/be/test/http/http_client_test.cpp
index 4e2ada03c66..d42e0a6775e 100644
--- a/be/test/http/http_client_test.cpp
+++ b/be/test/http/http_client_test.cpp
@@ -140,7 +140,7 @@ TEST_F(HttpClientTest, get_normal) {
client.set_basic_auth("test1", "");
std::string response;
st = client.execute(&response);
- EXPECT_TRUE(st.ok());
+ EXPECT_TRUE(st.ok()) << st;
EXPECT_STREQ("test1", response.c_str());
// for head
@@ -380,7 +380,7 @@ TEST_F(HttpClientTest, enable_http_auth) {
std::cout << "st = " << st << "\n";
std::cout << "response = " << response << "\n";
std::cout << "st.msg() = " << st.msg() << "\n";
- EXPECT_TRUE(!st.ok());
+ EXPECT_TRUE(!st.ok()) << st;
EXPECT_TRUE(st.msg().find("The requested URL returned error") !=
std::string::npos);
}
@@ -477,6 +477,36 @@ TEST_F(HttpClientTest, enable_http_auth) {
EXPECT_TRUE(st.msg().find("Operation timed out after") !=
std::string::npos);
}
+ // valid token
+ {
+ config::enable_all_http_auth = true;
+ std::string url = hostname + "/metrics";
+ HttpClient client;
+ auto st = client.init(url);
+ EXPECT_TRUE(st.ok());
+ client.set_method(GET);
+ client.set_auth_token("valid_token");
+ client.set_timeout_ms(200);
+ std::string response;
+ st = client.execute(&response);
+ EXPECT_TRUE(st.ok()) << st;
+ }
+
+ // invalid token
+ {
+ config::enable_all_http_auth = true;
+ std::string url = hostname + "/metrics";
+ HttpClient client;
+ auto st = client.init(url);
+ EXPECT_TRUE(st.ok());
+ client.set_method(GET);
+ client.set_auth_token("invalid_token");
+ client.set_timeout_ms(200);
+ std::string response;
+ st = client.execute(&response);
+ EXPECT_TRUE(!st.ok()) << st;
+ }
+
{
config::enable_all_http_auth = true;
std::string url = hostname + "/api/glog/adjust";
diff --git a/be/test/olap/wal/wal_manager_test.cpp
b/be/test/olap/wal/wal_manager_test.cpp
index afb4c7696ef..32162593fc0 100644
--- a/be/test/olap/wal/wal_manager_test.cpp
+++ b/be/test/olap/wal/wal_manager_test.cpp
@@ -55,10 +55,10 @@ public:
void SetUp() override {
prepare();
_env = ExecEnv::GetInstance();
- _env->_master_info = new TMasterInfo();
- _env->_master_info->network_address.hostname = "host name";
- _env->_master_info->network_address.port = 1234;
- _env->_master_info->backend_id = 1001;
+ _env->_cluster_info = new ClusterInfo();
+ _env->_cluster_info->master_fe_addr.hostname = "host name";
+ _env->_cluster_info->master_fe_addr.port = 1234;
+ _env->_cluster_info->backend_id = 1001;
_env->new_load_stream_mgr() = NewLoadStreamMgr::create_shared();
_env->_internal_client_cache = new
BrpcClientCache<PBackendService_Stub>();
_env->_function_client_cache = new
BrpcClientCache<PFunctionService_Stub>();
@@ -77,7 +77,7 @@ public:
SAFE_STOP(_env->_wal_manager);
SAFE_DELETE(_env->_function_client_cache);
SAFE_DELETE(_env->_internal_client_cache);
- SAFE_DELETE(_env->_master_info);
+ SAFE_DELETE(_env->_cluster_info);
}
void prepare() {
@@ -242,4 +242,4 @@ TEST_F(WalManagerTest, TestDynamicWalSpaceLimt) {
EXPECT_EQ(_env->wal_mgr()->_init_wal_dirs_info(),
Status::InternalError(""));
EXPECT_EQ(_env->wal_mgr()->wal_limit_test_bytes, available_bytes);
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp
b/be/test/runtime/routine_load_task_executor_test.cpp
index 338b82c6eba..5c2b39bce1f 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -55,7 +55,7 @@ public:
k_stream_load_rollback_result = TLoadTxnRollbackResult();
k_stream_load_put_result = TStreamLoadPutResult();
- _env.set_master_info(new TMasterInfo());
+ _env.set_cluster_info(new ClusterInfo());
_env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
@@ -63,7 +63,7 @@ public:
config::max_consumer_num_per_group = 3;
}
- void TearDown() override { delete _env.master_info(); }
+ void TearDown() override { delete _env.cluster_info(); }
ExecEnv _env;
};
@@ -121,4 +121,4 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
executor.stop();
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp
b/be/test/vec/exec/vfile_scanner_exception_test.cpp
index 1d565c7e0ce..4b6ce46bd88 100644
--- a/be/test/vec/exec/vfile_scanner_exception_test.cpp
+++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp
@@ -26,6 +26,7 @@
#include "io/fs/local_file_system.h"
#include "olap/wal/wal_manager.h"
#include "pipeline/exec/file_scan_operator.h"
+#include "runtime/cluster_info.h"
#include "runtime/descriptors.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
@@ -107,7 +108,7 @@ private:
TFileRangeDesc _range_desc;
TFileScanRange _scan_range;
std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
- std::unique_ptr<TMasterInfo> _master_info = nullptr;
+ std::unique_ptr<ClusterInfo> _cluster_info = nullptr;
};
void VfileScannerExceptionTest::_init_desc_table() {
@@ -266,12 +267,12 @@ void VfileScannerExceptionTest::init() {
_scan_range.params.format_type = TFileFormatType::FORMAT_JNI;
_kv_cache.reset(new ShardedKVCache(48));
- _master_info.reset(new TMasterInfo());
+ _cluster_info.reset(new ClusterInfo());
_env = ExecEnv::GetInstance();
- _env->_master_info = _master_info.get();
- _env->_master_info->network_address.hostname = "host name";
- _env->_master_info->network_address.port = _backend_id;
- _env->_master_info->backend_id = 1001;
+ _env->_cluster_info = _cluster_info.get();
+ _env->_cluster_info->master_fe_addr.hostname = "host name";
+ _env->_cluster_info->master_fe_addr.port = _backend_id;
+ _env->_cluster_info->backend_id = 1001;
_env->_wal_manager = 0;
}
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp
b/be/test/vec/exec/vwal_scanner_test.cpp
index 0944193e6d0..5c4056a8c24 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -26,6 +26,7 @@
#include "io/fs/local_file_system.h"
#include "olap/wal/wal_manager.h"
#include "pipeline/exec/file_scan_operator.h"
+#include "runtime/cluster_info.h"
#include "runtime/descriptors.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
@@ -116,7 +117,7 @@ private:
TFileRangeDesc _range_desc;
TFileScanRange _scan_range;
std::unique_ptr<ShardedKVCache> _kv_cache = nullptr;
- std::unique_ptr<TMasterInfo> _master_info = nullptr;
+ std::unique_ptr<ClusterInfo> _cluster_info = nullptr;
};
void VWalScannerTest::_init_desc_table() {
@@ -279,12 +280,12 @@ void VWalScannerTest::init() {
_scan_range.params.format_type = TFileFormatType::FORMAT_WAL;
_kv_cache.reset(new ShardedKVCache(48));
- _master_info.reset(new TMasterInfo());
+ _cluster_info.reset(new ClusterInfo());
_env = ExecEnv::GetInstance();
- _env->_master_info = _master_info.get();
- _env->_master_info->network_address.hostname = "host name";
- _env->_master_info->network_address.port = _backend_id;
- _env->_master_info->backend_id = 1001;
+ _env->_cluster_info = _cluster_info.get();
+ _env->_cluster_info->master_fe_addr.hostname = "host name";
+ _env->_cluster_info->master_fe_addr.port = _backend_id;
+ _env->_cluster_info->backend_id = 1001;
_env->_wal_manager = WalManager::create_shared(_env, _wal_dir);
std::string base_path;
auto st = _env->_wal_manager->_init_wal_dirs_info();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 26c3cc1b1b4..e48b514cdd2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -571,6 +571,8 @@ public class Env {
private final List<String> forceSkipJournalIds =
Arrays.asList(Config.force_skip_journal_ids);
+ private TokenManager tokenManager;
+
// if a config is relative to a daemon thread. record the relation here.
we will proactively change interval of it.
private final Map<String, Supplier<MasterDaemon>> configtoThreads =
ImmutableMap
.of("dynamic_partition_check_interval_seconds",
this::getDynamicPartitionScheduler);
@@ -817,6 +819,7 @@ public class Env {
this.sqlCacheManager = new NereidsSqlCacheManager();
this.splitSourceManager = new SplitSourceManager();
this.globalExternalTransactionInfoMgr = new
GlobalExternalTransactionInfoMgr();
+ this.tokenManager = new TokenManager();
}
public static void destroyCheckpoint() {
@@ -1852,6 +1855,7 @@ public class Env {
// start threads that should run on all FE
protected void startNonMasterDaemonThreads() {
// start load manager thread
+ tokenManager.start();
loadManager.start();
tabletStatMgr.start();
@@ -4368,6 +4372,10 @@ public class Env {
return loadManager;
}
+ public TokenManager getTokenManager() {
+ return tokenManager;
+ }
+
public ProgressManager getProgressManager() {
return progressManager;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java
similarity index 95%
rename from
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
rename to fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java
index 21e9f9b0434..c6c6911a044 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/TokenManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TokenManager.java
@@ -15,9 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.load.loadv2;
+package org.apache.doris.catalog;
-import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
@@ -39,6 +38,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+/**
+ * This class is used to manage the token for internal authentication.
+ * It will generate a new token every 12
hours(Config.token_generate_period_hour)
+ * and keep at most 6 tokens(Config.token_queue_size).
+ * So each token will be valid for 3 days.
+ * Only Master FE can generate a new token.
+ */
public class TokenManager {
private static final Logger LOG = LogManager.getLogger(TokenManager.class);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index 70492e5eab2..2f9efc1ed1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -575,7 +575,7 @@ public class LoadAction extends RestBaseController {
// So this function is not widely tested under general scenario
private boolean checkClusterToken(String token) {
try {
- return
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
+ return Env.getCurrentEnv().getTokenManager().checkAuthToken(token);
} catch (UserException e) {
throw new UnauthorizedException(e.getMessage());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 7887d8c602b..07c459d61cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -101,16 +101,13 @@ public class LoadManager implements Writable {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private MysqlLoadManager mysqlLoadManager;
- private TokenManager tokenManager;
public LoadManager(LoadJobScheduler loadJobScheduler) {
this.loadJobScheduler = loadJobScheduler;
- this.tokenManager = new TokenManager();
- this.mysqlLoadManager = new MysqlLoadManager(tokenManager);
+ this.mysqlLoadManager = new MysqlLoadManager();
}
public void start() {
- tokenManager.start();
mysqlLoadManager.start();
}
@@ -184,10 +181,6 @@ public class LoadManager implements Writable {
return mysqlLoadManager;
}
- public TokenManager getTokenManager() {
- return tokenManager;
- }
-
public void replayCreateLoadJob(LoadJob loadJob) {
createLoadJob(loadJob);
LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg",
"replay create load job").build());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index d09e73b4a33..68dffbfb3e3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -76,7 +76,6 @@ public class MysqlLoadManager {
private static final Logger LOG =
LogManager.getLogger(MysqlLoadManager.class);
private ThreadPoolExecutor mysqlLoadPool;
- private final TokenManager tokenManager;
private static class MySqlLoadContext {
private boolean finished;
@@ -143,8 +142,7 @@ public class MysqlLoadManager {
private EvictingQueue<MySqlLoadFailRecord> failedRecords;
private ScheduledExecutorService periodScheduler;
- public MysqlLoadManager(TokenManager tokenManager) {
- this.tokenManager = tokenManager;
+ public MysqlLoadManager() {
}
public void start() {
@@ -178,7 +176,7 @@ public class MysqlLoadManager {
VariableMgr.setVar(sessionVariable,
new SetVar(SessionVariable.QUERY_TIMEOUT, new
StringLiteral(String.valueOf(newTimeOut))));
}
- String token = tokenManager.acquireToken();
+ String token = Env.getCurrentEnv().getTokenManager().acquireToken();
boolean clientLocal = dataDesc.isClientLocal();
MySqlLoadContext loadContext = new MySqlLoadContext();
loadContextMap.put(loadId, loadContext);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index b00f89db4b5..f4a7259bf71 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -136,7 +136,7 @@ public class CanalSyncChannel extends SyncChannel {
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
- String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+ String token =
Env.getCurrentEnv().getTokenManager().acquireToken();
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
index cf9a8797161..6b301c16f73 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java
@@ -198,7 +198,7 @@ public class InsertUtils {
String label = txnEntry.getLabel();
try {
long txnId;
- String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+ String token =
Env.getCurrentEnv().getTokenManager().acquireToken();
if (Config.isCloudMode() || Env.getCurrentEnv().isMaster()) {
txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
txnConf.getDbId(), Lists.newArrayList(tblObj.getId()),
label,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 06789b864b6..644bddee58b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -187,7 +187,7 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
String token = "";
try {
// Acquire token from master
- token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+ token =
Env.getCurrentEnv().getTokenManager().acquireToken();
} catch (Exception e) {
LOG.warn("Failed to get auth token: {}", e);
discardLogNum += auditLogNum;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 7b557418b80..9daaf525ca4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2247,10 +2247,10 @@ public class StmtExecutor {
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
txnConf.setTxnId(txnId);
- String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+ String token =
Env.getCurrentEnv().getTokenManager().acquireToken();
txnConf.setToken(token);
} else {
- String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+ String token =
Env.getCurrentEnv().getTokenManager().acquireToken();
MasterTxnExecutor masterTxnExecutor = new
MasterTxnExecutor(context);
TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
request.setDb(txnConf.getDb()).setTbl(txnConf.getTbl()).setToken(token)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ba49d6da8e6..ac342672638 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1230,7 +1230,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request,
String clientIp) throws UserException {
if (request.isSetAuthCode()) {
- // TODO(cmy): find a way to check
+ // TODO: deprecated, removed in 3.1, use token instead.
} else if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(),
request.getPasswd(), request.getDb(),
request.getTbl(),
@@ -1464,7 +1464,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
private void loadTxnPreCommitImpl(TLoadTxnCommitRequest request) throws
UserException {
if (request.isSetAuthCode()) {
- // CHECKSTYLE IGNORE THIS LINE
+ // TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
if (!checkToken(request.getToken())) {
throw new AuthenticationException("Invalid token: " +
request.getToken());
@@ -1650,7 +1650,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// timeout
private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws
UserException {
if (request.isSetAuthCode()) {
- // TODO(cmy): find a way to check
+ // TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
@@ -1791,7 +1791,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// Step 3: check auth
if (request.isSetAuthCode()) {
- // TODO(cmy): find a way to check
+ // TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
@@ -1871,7 +1871,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws
UserException {
if (request.isSetAuthCode()) {
- // TODO(cmy): find a way to check
+ // TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
@@ -1994,7 +1994,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
// Step 3: check auth
if (request.isSetAuthCode()) {
- // TODO(cmy): find a way to check
+ // TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
@@ -2159,7 +2159,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
ConnectContext ctx = ConnectContext.get();
if (request.isSetAuthCode()) {
- // TODO(cmy): find a way to check
+ // TODO: deprecated, removed in 3.1, use token instead.
} else if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(),
request.getPasswd(), request.getDb(),
request.getTbl(),
@@ -2390,7 +2390,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
try {
- String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+ String token =
Env.getCurrentEnv().getTokenManager().acquireToken();
result.setToken(token);
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
@@ -2409,7 +2409,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.debug("receive check token request from client: {}",
clientAddr);
}
try {
- return
Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(token);
+ return Env.getCurrentEnv().getTokenManager().checkAuthToken(token);
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
return false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index fb6853e83c3..d7eff484c6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -250,6 +250,7 @@ public class HeartbeatMgr extends MasterDaemon {
copiedMasterInfo.setHeartbeatFlags(flags);
copiedMasterInfo.setBackendId(backendId);
copiedMasterInfo.setFrontendInfos(feInfos);
+
copiedMasterInfo.setAuthToken(Env.getCurrentEnv().getTokenManager().acquireToken());
if (Config.isCloudMode()) {
String cloudUniqueId =
backend.getTagMap().get(Tag.CLOUD_UNIQUE_ID);
copiedMasterInfo.setCloudUniqueId(cloudUniqueId);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index fcab55866ed..25c4ff4b3b2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -225,7 +225,7 @@ public class TransactionEntry {
ExecuteEnv.getInstance().getStartupTime()),
LoadJobSourceType.INSERT_STREAMING, timeoutSecond);
} else {
- String token =
Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
+ String token =
Env.getCurrentEnv().getTokenManager().acquireToken();
MasterTxnExecutor masterTxnExecutor = new
MasterTxnExecutor(ConnectContext.get());
TLoadTxnBeginRequest request = new TLoadTxnBeginRequest();
request.setDb(database.getFullName()).setTbl(table.getName()).setToken(token)
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
index 13ae9b6e44e..2f8f1437344 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/TokenManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
+import org.apache.doris.catalog.TokenManager;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 77e49e6c1fa..7078d5eaeab 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -645,12 +645,12 @@ struct TLoadTxnBeginRequest {
6: optional string user_ip
7: required string label
8: optional i64 timestamp // deprecated, use request_id instead
- 9: optional i64 auth_code
+ 9: optional i64 auth_code // deprecated, use token instead
// The real value of timeout should be i32. i64 ensures the compatibility
of interface.
10: optional i64 timeout
11: optional Types.TUniqueId request_id
12: optional string token
- 13: optional string auth_code_uuid
+ 13: optional string auth_code_uuid // deprecated, use token instead
14: optional i64 table_id
15: optional i64 backend_id
}
@@ -670,7 +670,7 @@ struct TBeginTxnRequest {
5: optional list<i64> table_ids
6: optional string user_ip
7: optional string label
- 8: optional i64 auth_code
+ 8: optional i64 auth_code // deprecated, use token instead
// The real value of timeout should be i32. i64 ensures the compatibility
of interface.
9: optional i64 timeout
10: optional Types.TUniqueId request_id
@@ -718,7 +718,7 @@ struct TStreamLoadPutRequest {
14: optional string columnSeparator
15: optional string partitions
- 16: optional i64 auth_code
+ 16: optional i64 auth_code // deprecated, use token instead
17: optional bool negative
18: optional i32 timeout
19: optional bool strictMode
@@ -831,14 +831,14 @@ struct TLoadTxnCommitRequest {
7: required i64 txnId
8: required bool sync
9: optional list<Types.TTabletCommitInfo> commitInfos
- 10: optional i64 auth_code
+ 10: optional i64 auth_code // deprecated, use token instead
11: optional TTxnCommitAttachment txnCommitAttachment
12: optional i64 thrift_rpc_timeout_ms
13: optional string token
14: optional i64 db_id
15: optional list<string> tbls
16: optional i64 table_id
- 17: optional string auth_code_uuid
+ 17: optional string auth_code_uuid // deprecated, use token instead
18: optional bool groupCommit
19: optional i64 receiveBytes
20: optional i64 backendId
@@ -856,7 +856,7 @@ struct TCommitTxnRequest {
5: optional string user_ip
6: optional i64 txn_id
7: optional list<Types.TTabletCommitInfo> commit_infos
- 8: optional i64 auth_code
+ 8: optional i64 auth_code // deprecated, use token instead
9: optional TTxnCommitAttachment txn_commit_attachment
10: optional i64 thrift_rpc_timeout_ms
11: optional string token
@@ -879,13 +879,13 @@ struct TLoadTxn2PCRequest {
5: optional string user_ip
6: optional i64 txnId
7: optional string operation
- 8: optional i64 auth_code
+ 8: optional i64 auth_code // deprecated, use token instead
9: optional string token
10: optional i64 thrift_rpc_timeout_ms
11: optional string label
// For cloud
- 1000: optional string auth_code_uuid
+ 1000: optional string auth_code_uuid // deprecated, use token instead
}
struct TLoadTxn2PCResult {
@@ -900,7 +900,7 @@ struct TRollbackTxnRequest {
5: optional string user_ip
6: optional i64 txn_id
7: optional string reason
- 9: optional i64 auth_code
+ 9: optional i64 auth_code // deprecated, use token instead
10: optional TTxnCommitAttachment txn_commit_attachment
11: optional string token
12: optional i64 db_id
@@ -920,12 +920,12 @@ struct TLoadTxnRollbackRequest {
6: optional string user_ip
7: required i64 txnId
8: optional string reason
- 9: optional i64 auth_code
+ 9: optional i64 auth_code // deprecated, use token instead
10: optional TTxnCommitAttachment txnCommitAttachment
11: optional string token
12: optional i64 db_id
13: optional list<string> tbls
- 14: optional string auth_code_uuid
+ 14: optional string auth_code_uuid // deprecated, use token instead
15: optional string label
}
diff --git a/gensrc/thrift/HeartbeatService.thrift
b/gensrc/thrift/HeartbeatService.thrift
index acdc608f21b..47c41650b78 100644
--- a/gensrc/thrift/HeartbeatService.thrift
+++ b/gensrc/thrift/HeartbeatService.thrift
@@ -43,6 +43,7 @@ struct TMasterInfo {
11: optional string cloud_unique_id;
// See configuration item Config.java rehash_tablet_after_be_dead_seconds
for meaning
12: optional i64 tablet_report_inactive_duration_ms;
+ 13: optional string auth_token;
}
struct TBackendInfo {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]