This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 826cfdaf93 [feature](information_schema) add `backends`
information_schema table (#13086)
826cfdaf93 is described below
commit 826cfdaf93516d9c60daf6d042e4bfc3adaa1897
Author: Tiewei Fang <[email protected]>
AuthorDate: Tue Nov 8 22:15:10 2022 +0800
[feature](information_schema) add `backends` information_schema table
(#13086)
---
be/src/exec/CMakeLists.txt | 1 +
be/src/exec/schema_scan_node.cpp | 5 +
be/src/exec/schema_scanner.cpp | 47 +++++-
be/src/exec/schema_scanner.h | 10 ++
.../schema_scanner/schema_backends_scanner.cpp | 165 +++++++++++++++++++++
.../exec/schema_scanner/schema_backends_scanner.h | 44 ++++++
be/src/vec/exec/vschema_scan_node.cpp | 24 ++-
.../org/apache/doris/analysis/SchemaTableType.java | 3 +-
.../java/org/apache/doris/catalog/SchemaTable.java | 51 ++++++-
.../org/apache/doris/planner/SchemaScanNode.java | 2 +
.../apache/doris/service/FrontendServiceImpl.java | 126 ++++++++++++++++
gensrc/thrift/Data.thrift | 9 +-
gensrc/thrift/Descriptors.thrift | 3 +-
gensrc/thrift/FrontendService.thrift | 16 ++
gensrc/thrift/PlanNodes.thrift | 8 +
.../suites/correctness/test_backends_table.groovy | 23 +++
16 files changed, 519 insertions(+), 18 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 1c7f2be313..2e020e2f92 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -88,6 +88,7 @@ set(EXEC_FILES
schema_scanner/schema_files_scanner.cpp
schema_scanner/schema_partitions_scanner.cpp
schema_scanner/schema_rowsets_scanner.cpp
+ schema_scanner/schema_backends_scanner.cpp
partitioned_hash_table.cc
partitioned_aggregation_node.cc
diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp
index 6a1b546ec1..e52cddb98f 100644
--- a/be/src/exec/schema_scan_node.cpp
+++ b/be/src/exec/schema_scan_node.cpp
@@ -82,6 +82,11 @@ Status SchemaScanNode::init(const TPlanNode& tnode,
RuntimeState* state) {
if (tnode.schema_scan_node.__isset.thread_id) {
_scanner_param.thread_id = tnode.schema_scan_node.thread_id;
}
+
+ if (tnode.schema_scan_node.__isset.table_structure) {
+ _scanner_param.table_structure = _pool->add(
+ new
std::vector<TSchemaTableStructure>(tnode.schema_scan_node.table_structure));
+ }
return Status::OK();
}
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index fb4623114c..a452077f6c 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -17,6 +17,7 @@
#include "exec/schema_scanner.h"
+#include "exec/schema_scanner/schema_backends_scanner.h"
#include "exec/schema_scanner/schema_charsets_scanner.h"
#include "exec/schema_scanner/schema_collations_scanner.h"
#include "exec/schema_scanner/schema_columns_scanner.h"
@@ -31,6 +32,8 @@
#include "exec/schema_scanner/schema_user_privileges_scanner.h"
#include "exec/schema_scanner/schema_variables_scanner.h"
#include "exec/schema_scanner/schema_views_scanner.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/string_value.h"
namespace doris {
@@ -41,9 +44,23 @@ SchemaScanner::SchemaScanner(ColumnDesc* columns, int
column_num)
_param(nullptr),
_columns(columns),
_column_num(column_num),
- _tuple_desc(nullptr) {}
+ _tuple_desc(nullptr),
+ _schema_table_type(TSchemaTableType::SCH_INVALID) {}
-SchemaScanner::~SchemaScanner() {}
+SchemaScanner::SchemaScanner(ColumnDesc* columns, int column_num,
TSchemaTableType::type type)
+ : _is_init(false),
+ _param(nullptr),
+ _columns(columns),
+ _column_num(column_num),
+ _tuple_desc(nullptr),
+ _schema_table_type(type) {}
+
+SchemaScanner::~SchemaScanner() {
+ if (_is_create_columns == true && _columns != nullptr) {
+ delete[] _columns;
+ _columns = nullptr;
+ }
+}
Status SchemaScanner::start(RuntimeState* state) {
if (!_is_init) {
@@ -70,8 +87,15 @@ Status SchemaScanner::init(SchemaScannerParam* param,
ObjectPool* pool) {
if (_is_init) {
return Status::OK();
}
+ if (nullptr == param || nullptr == pool) {
+ return Status::InternalError("invalid parameter");
+ }
+
+ if (_schema_table_type == TSchemaTableType::SCH_BACKENDS) {
+ RETURN_IF_ERROR(create_columns(param->table_structure, pool));
+ }
- if (nullptr == param || nullptr == pool || nullptr == _columns) {
+ if (nullptr == _columns) {
return Status::InternalError("invalid parameter");
}
@@ -113,15 +137,30 @@ SchemaScanner*
SchemaScanner::create(TSchemaTableType::type type) {
return new (std::nothrow) SchemaPartitionsScanner();
case TSchemaTableType::SCH_ROWSETS:
return new (std::nothrow) SchemaRowsetsScanner();
+ case TSchemaTableType::SCH_BACKENDS:
+ return new (std::nothrow) SchemaBackendsScanner();
default:
return new (std::nothrow) SchemaDummyScanner();
break;
}
}
+Status SchemaScanner::create_columns(const std::vector<TSchemaTableStructure>*
table_structure,
+ ObjectPool* pool) {
+ _column_num = table_structure->size();
+ _columns = new ColumnDesc[_column_num];
+ _is_create_columns = true;
+ for (size_t idx = 0; idx < table_structure->size(); ++idx) {
+ _columns[idx].name = table_structure->at(idx).column_name.c_str();
+ _columns[idx].type = thrift_to_type(table_structure->at(idx).type);
+ _columns[idx].size = table_structure->at(idx).len;
+ _columns[idx].is_null = table_structure->at(idx).is_null;
+ }
+ return Status::OK();
+}
+
Status SchemaScanner::create_tuple_desc(ObjectPool* pool) {
int null_column = 0;
-
for (int i = 0; i < _column_num; ++i) {
if (_columns[i].is_null) {
null_column++;
diff --git a/be/src/exec/schema_scanner.h b/be/src/exec/schema_scanner.h
index 2450da24d4..33b73b894c 100644
--- a/be/src/exec/schema_scanner.h
+++ b/be/src/exec/schema_scanner.h
@@ -43,6 +43,7 @@ struct SchemaScannerParam {
const std::string* ip; // frontend ip
int32_t port; // frontend thrift port
int64_t thread_id;
+ const std::vector<TSchemaTableStructure>* table_structure;
SchemaScannerParam()
: db(nullptr),
@@ -68,6 +69,7 @@ public:
int scale = -1;
};
SchemaScanner(ColumnDesc* columns, int column_num);
+ SchemaScanner(ColumnDesc* columns, int column_num, TSchemaTableType::type
type);
virtual ~SchemaScanner();
// init object need information, schema etc.
@@ -84,6 +86,8 @@ public:
protected:
Status create_tuple_desc(ObjectPool* pool);
+ Status create_columns(const std::vector<TSchemaTableStructure>*
table_structure,
+ ObjectPool* pool);
bool _is_init;
// this is used for sub class
@@ -94,7 +98,13 @@ protected:
int _column_num;
TupleDescriptor* _tuple_desc;
+ // _is_create_columns means if ColumnDesc is created from FE.
+ // `_columns` should be deleted if _is_create_columns = true.
+ bool _is_create_columns = false;
+
static DorisServer* _s_doris_server;
+
+ TSchemaTableType::type _schema_table_type;
};
} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_backends_scanner.cpp
b/be/src/exec/schema_scanner/schema_backends_scanner.cpp
new file mode 100644
index 0000000000..ec29eb60a6
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_backends_scanner.cpp
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/schema_scanner/schema_backends_scanner.h"
+
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/HeartbeatService_types.h>
+
+#include "exec/schema_scanner.h"
+#include "gen_cpp/FrontendService.h"
+#include "runtime/client_cache.h"
+#include "runtime/define_primitive_type.h"
+#include "runtime/exec_env.h"
+#include "runtime/primitive_type.h"
+#include "runtime/string_value.h"
+#include "util/thrift_rpc_helper.h"
+
+namespace doris {
+
+SchemaBackendsScanner::SchemaBackendsScanner()
+ : SchemaScanner(nullptr, 0, TSchemaTableType::SCH_BACKENDS),
_row_idx(0) {}
+
+Status SchemaBackendsScanner::start(RuntimeState* state) {
+ if (!_is_init) {
+ return Status::InternalError("used before initialized.");
+ }
+ RETURN_IF_ERROR(_fetch_backends_info());
+ RETURN_IF_ERROR(_set_col_name_to_type());
+ return Status::OK();
+}
+
+Status SchemaBackendsScanner::get_next_row(Tuple* tuple, MemPool* pool, bool*
eos) {
+ if (!_is_init) {
+ return Status::InternalError("Used before initialized.");
+ }
+ if (nullptr == tuple || nullptr == pool || nullptr == eos) {
+ return Status::InternalError("input pointer is nullptr.");
+ }
+ if (_row_idx >= _batch_data.size()) {
+ *eos = true;
+ return Status::OK();
+ }
+ *eos = false;
+ return _fill_one_row(tuple, pool);
+}
+
+Status SchemaBackendsScanner::_fill_one_row(Tuple* tuple, MemPool* pool) {
+ memset((void*)tuple, 0, _tuple_desc->num_null_bytes());
+ for (size_t col_idx = 0; col_idx < _column_num; ++col_idx) {
+ RETURN_IF_ERROR(_fill_one_col(tuple, pool, col_idx));
+ }
+ ++_row_idx;
+ return Status::OK();
+}
+
+Status SchemaBackendsScanner::_fill_one_col(Tuple* tuple, MemPool* pool,
size_t col_idx) {
+ auto it = _col_name_to_type.find(_columns[col_idx].name);
+
+ // if this column is not exist in BE, we fill it with `NULL`.
+ if (it == _col_name_to_type.end()) {
+ if (_columns[col_idx].is_null) {
+
tuple->set_null(_tuple_desc->slots()[col_idx]->null_indicator_offset());
+ } else {
+ return Status::InternalError("column {} is not found in BE, and {}
is not nullable.",
+ _columns[col_idx].name,
_columns[col_idx].name);
+ }
+ } else if (it->second == TYPE_BIGINT) {
+ void* slot =
tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset());
+ *(reinterpret_cast<int64_t*>(slot)) =
_batch_data[_row_idx].column_value[col_idx].longVal;
+ } else if (it->second == TYPE_INT) {
+ void* slot =
tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset());
+ *(reinterpret_cast<int32_t*>(slot)) =
_batch_data[_row_idx].column_value[col_idx].intVal;
+ } else if (it->second == TYPE_VARCHAR) {
+ void* slot =
tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset());
+ StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
+ str_slot->ptr =
+
(char*)pool->allocate(_batch_data[_row_idx].column_value[col_idx].stringVal.size());
+ str_slot->len =
_batch_data[_row_idx].column_value[col_idx].stringVal.size();
+ memcpy(str_slot->ptr,
_batch_data[_row_idx].column_value[col_idx].stringVal.c_str(),
+ str_slot->len);
+ } else if (it->second == TYPE_DOUBLE) {
+ void* slot =
tuple->get_slot(_tuple_desc->slots()[col_idx]->tuple_offset());
+ *(reinterpret_cast<double_t*>(slot)) =
+ _batch_data[_row_idx].column_value[col_idx].doubleVal;
+ } else {
+ // other type
+ }
+ return Status::OK();
+}
+
+Status SchemaBackendsScanner::_fetch_backends_info() {
+ TFetchSchemaTableDataRequest request;
+ request.cluster_name = "";
+ request.__isset.cluster_name = true;
+ request.schema_table_name = TSchemaTableName::BACKENDS;
+ request.__isset.schema_table_name = true;
+ TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
+ // TODO(ftw): if result will too large?
+ TFetchSchemaTableDataResult result;
+
+ RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
+ master_addr.hostname, master_addr.port,
+ [&request, &result](FrontendServiceConnection& client) {
+ client->fetchSchemaTableData(result, request);
+ },
+ config::txn_commit_rpc_timeout_ms));
+
+ Status status(result.status);
+ if (!status.ok()) {
+ LOG(WARNING) << "fetch schema table data from master failed, errmsg="
+ << status.get_error_msg();
+ return status;
+ }
+ _batch_data = std::move(result.data_batch);
+ return Status::OK();
+}
+
+Status SchemaBackendsScanner::_set_col_name_to_type() {
+ _col_name_to_type.emplace("BackendId", TYPE_BIGINT);
+ _col_name_to_type.emplace("TabletNum", TYPE_BIGINT);
+
+ _col_name_to_type.emplace("HeartbeatPort", TYPE_INT);
+ _col_name_to_type.emplace("BePort", TYPE_INT);
+ _col_name_to_type.emplace("HttpPort", TYPE_INT);
+ _col_name_to_type.emplace("BrpcPort", TYPE_INT);
+
+ _col_name_to_type.emplace("Cluster", TYPE_VARCHAR);
+ _col_name_to_type.emplace("IP", TYPE_VARCHAR);
+ _col_name_to_type.emplace("LastStartTime", TYPE_VARCHAR);
+ _col_name_to_type.emplace("LastHeartbeat", TYPE_VARCHAR);
+ _col_name_to_type.emplace("Alive", TYPE_VARCHAR);
+ _col_name_to_type.emplace("SystemDecommissioned", TYPE_VARCHAR);
+ _col_name_to_type.emplace("ClusterDecommissioned", TYPE_VARCHAR);
+
+ _col_name_to_type.emplace("DataUsedCapacity", TYPE_BIGINT);
+ _col_name_to_type.emplace("AvailCapacity", TYPE_BIGINT);
+ _col_name_to_type.emplace("TotalCapacity", TYPE_BIGINT);
+
+ _col_name_to_type.emplace("UsedPct", TYPE_DOUBLE);
+ _col_name_to_type.emplace("MaxDiskUsedPct", TYPE_DOUBLE);
+
+ _col_name_to_type.emplace("RemoteUsedCapacity", TYPE_BIGINT);
+
+ _col_name_to_type.emplace("Tag", TYPE_VARCHAR);
+ _col_name_to_type.emplace("ErrMsg", TYPE_VARCHAR);
+ _col_name_to_type.emplace("Version", TYPE_VARCHAR);
+ _col_name_to_type.emplace("Status", TYPE_VARCHAR);
+ return Status::OK();
+}
+} // namespace doris
diff --git a/be/src/exec/schema_scanner/schema_backends_scanner.h
b/be/src/exec/schema_scanner/schema_backends_scanner.h
new file mode 100644
index 0000000000..32753f568e
--- /dev/null
+++ b/be/src/exec/schema_scanner/schema_backends_scanner.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/schema_scanner.h"
+namespace doris {
+
+class SchemaBackendsScanner : public SchemaScanner {
+public:
+ SchemaBackendsScanner();
+ ~SchemaBackendsScanner() override = default;
+
+ Status start(RuntimeState* state) override;
+ Status get_next_row(Tuple* tuple, MemPool* pool, bool* eos) override;
+
+private:
+ Status _fill_one_row(Tuple* tuple, MemPool* pool);
+ Status _fetch_backends_info();
+ Status _fill_one_col(Tuple* tuple, MemPool* pool, size_t idx);
+ Status _set_col_name_to_type();
+
+private:
+ // column_name -> type, set by _set_col_name_to_type()
+ std::unordered_map<std::string, PrimitiveType> _col_name_to_type;
+
+ std::vector<TRow> _batch_data;
+ size_t _row_idx;
+};
+} // namespace doris
diff --git a/be/src/vec/exec/vschema_scan_node.cpp
b/be/src/vec/exec/vschema_scan_node.cpp
index dfb6811cbf..0f2710b665 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -50,10 +50,10 @@ VSchemaScanNode::~VSchemaScanNode() {
_src_tuple = nullptr;
delete[] reinterpret_cast<char*>(_src_single_tuple);
- _src_single_tuple = NULL;
+ _src_single_tuple = nullptr;
delete[] reinterpret_cast<char*>(_dest_single_tuple);
- _dest_single_tuple = NULL;
+ _dest_single_tuple = nullptr;
}
Status VSchemaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
@@ -92,6 +92,11 @@ Status VSchemaScanNode::init(const TPlanNode& tnode,
RuntimeState* state) {
if (tnode.schema_scan_node.__isset.thread_id) {
_scanner_param.thread_id = tnode.schema_scan_node.thread_id;
}
+
+ if (tnode.schema_scan_node.__isset.table_structure) {
+ _scanner_param.table_structure = _pool->add(
+ new
std::vector<TSchemaTableStructure>(tnode.schema_scan_node.table_structure));
+ }
return Status::OK();
}
@@ -138,7 +143,7 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
// new one mem pool
_tuple_pool.reset(new (std::nothrow) MemPool());
- if (nullptr == _tuple_pool.get()) {
+ if (nullptr == _tuple_pool) {
return Status::InternalError("Allocate MemPool failed.");
}
@@ -161,7 +166,7 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
// new one scanner
_schema_scanner.reset(SchemaScanner::create(schema_table->schema_table_type()));
- if (nullptr == _schema_scanner.get()) {
+ if (nullptr == _schema_scanner) {
return Status::InternalError("schema scanner get nullptr pointer.");
}
@@ -221,13 +226,13 @@ Status VSchemaScanNode::prepare(RuntimeState* state) {
_src_single_tuple =
reinterpret_cast<doris::Tuple*>(new (std::nothrow)
char[_src_tuple_desc->byte_size()]);
- if (NULL == _src_single_tuple) {
+ if (nullptr == _src_single_tuple) {
return Status::InternalError("new src single tuple failed.");
}
_dest_single_tuple =
reinterpret_cast<doris::Tuple*>(new (std::nothrow)
char[_dest_tuple_desc->byte_size()]);
- if (NULL == _dest_single_tuple) {
+ if (nullptr == _dest_single_tuple) {
return Status::InternalError("new desc single tuple failed.");
}
@@ -239,9 +244,12 @@ Status VSchemaScanNode::get_next(RuntimeState* state,
vectorized::Block* block,
SCOPED_TIMER(_runtime_profile->total_time_counter());
VLOG_CRITICAL << "VSchemaScanNode::GetNext";
- if (state == NULL || block == NULL || eos == NULL)
+ if (state == nullptr || block == nullptr || eos == nullptr) {
return Status::InternalError("input is NULL pointer");
- if (!_is_init) return Status::InternalError("used before initialize.");
+ }
+ if (!_is_init) {
+ return Status::InternalError("used before initialize.");
+ }
RETURN_IF_CANCELLED(state);
std::vector<vectorized::MutableColumnPtr> columns(_slot_num);
bool schema_eos = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
index 43d771953f..c8df72b7de 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SchemaTableType.java
@@ -66,7 +66,8 @@ public enum SchemaTableType {
SCH_VIEWS("VIEWS", "VIEWS", TSchemaTableType.SCH_VIEWS),
SCH_CREATE_TABLE("CREATE_TABLE", "CREATE_TABLE",
TSchemaTableType.SCH_CREATE_TABLE),
SCH_INVALID("NULL", "NULL", TSchemaTableType.SCH_INVALID),
- SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS);
+ SCH_ROWSETS("ROWSETS", "ROWSETS", TSchemaTableType.SCH_ROWSETS),
+ SCH_BACKENDS("BACKENDS", "BACKENDS", TSchemaTableType.SCH_BACKENDS);
private static final String dbName = "INFORMATION_SCHEMA";
private static SelectList fullSelectLists;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 6ebc85f02c..7e06a00f33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.SchemaTableType;
import org.apache.doris.common.SystemIdGenerator;
import org.apache.doris.thrift.TSchemaTable;
+import org.apache.doris.thrift.TSchemaTableStructure;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@@ -399,8 +400,54 @@ public class SchemaTable extends Table {
.column("CREATION_TIME",
ScalarType.createType(PrimitiveType.BIGINT))
.column("OLDEST_WRITE_TIMESTAMP",
ScalarType.createType(PrimitiveType.BIGINT))
.column("NEWEST_WRITE_TIMESTAMP",
ScalarType.createType(PrimitiveType.BIGINT))
- .build())).build();
- private SchemaTableType schemaTableType;
+ .build()))
+ .put("backends", new SchemaTable(SystemIdGenerator.getNextId(),
"backends", TableType.SCHEMA,
+ builder().column("BackendId",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("Cluster", ScalarType.createVarchar(64))
+ .column("IP", ScalarType.createVarchar(16))
+ .column("HeartbeatPort",
ScalarType.createType(PrimitiveType.INT))
+ .column("BePort",
ScalarType.createType(PrimitiveType.INT))
+ .column("HttpPort",
ScalarType.createType(PrimitiveType.INT))
+ .column("BrpcPort",
ScalarType.createType(PrimitiveType.INT))
+ .column("LastStartTime",
ScalarType.createVarchar(32))
+ .column("LastHeartbeat",
ScalarType.createVarchar(32))
+ .column("Alive", ScalarType.createVarchar(8))
+ .column("SystemDecommissioned",
ScalarType.createVarchar(8))
+ .column("ClusterDecommissioned",
ScalarType.createVarchar(8))
+ .column("TabletNum",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("DataUsedCapacity",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("AvailCapacity",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("TotalCapacity",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("UsedPct",
ScalarType.createType(PrimitiveType.DOUBLE))
+ .column("MaxDiskUsedPct",
ScalarType.createType(PrimitiveType.DOUBLE))
+ .column("RemoteUsedCapacity",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("Tag", ScalarType.createVarchar(128))
+ .column("ErrMsg", ScalarType.createVarchar(2048))
+ .column("Version", ScalarType.createVarchar(64))
+ .column("Status", ScalarType.createVarchar(1024))
+ .build()))
+ .build();
+
+ public static List<TSchemaTableStructure> getTableStructure(String
tableName) {
+ List<TSchemaTableStructure> tSchemaTableStructureList =
Lists.newArrayList();
+ switch (tableName) {
+ case "backends": {
+ Table table = TABLE_MAP.get(tableName);
+ for (Column column : table.getFullSchema()) {
+ TSchemaTableStructure tSchemaTableStructure = new
TSchemaTableStructure();
+ tSchemaTableStructure.setColumnName(column.getName());
+
tSchemaTableStructure.setType(column.getDataType().toThrift());
+
tSchemaTableStructure.setLen(column.getDataType().getSlotSize());
+ tSchemaTableStructure.setIsNull(column.isAllowNull());
+ tSchemaTableStructureList.add(tSchemaTableStructure);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ return tSchemaTableStructureList;
+ }
protected SchemaTable(long id, String name, TableType type, List<Column>
baseSchema) {
super(id, name, type, baseSchema);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
index db70e0c623..4b5f5001b4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
@@ -110,6 +110,8 @@ public class SchemaScanNode extends ScanNode {
TUserIdentity tCurrentUser =
ConnectContext.get().getCurrentUserIdentity().toThrift();
msg.schema_scan_node.setCurrentUserIdent(tCurrentUser);
+
+
msg.schema_scan_node.setTableStructure(SchemaTable.getTableStructure(tableName));
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 53ee7dd637..57a7cb58d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.doris.service;
+import org.apache.doris.alter.DecommissionType;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
@@ -30,6 +31,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
+import org.apache.doris.cluster.Cluster;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
@@ -43,6 +45,7 @@ import org.apache.doris.common.ThriftServerContext;
import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
@@ -57,11 +60,13 @@ import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.VariableMgr;
+import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.FrontendServiceVersion;
+import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TColumnDef;
import org.apache.doris.thrift.TColumnDesc;
import org.apache.doris.thrift.TDescribeTableParams;
@@ -69,6 +74,8 @@ import org.apache.doris.thrift.TDescribeTableResult;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFeResult;
import org.apache.doris.thrift.TFetchResourceResult;
+import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
+import org.apache.doris.thrift.TFetchSchemaTableDataResult;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
@@ -99,6 +106,7 @@ import org.apache.doris.thrift.TPrivilegeStatus;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TReportRequest;
+import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TS3StorageParam;
import org.apache.doris.thrift.TShowVariableRequest;
import org.apache.doris.thrift.TShowVariableResult;
@@ -119,9 +127,11 @@ import
org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TxnCommitAttachment;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.gson.Gson;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@@ -977,6 +987,122 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
return result;
}
+ @Override
+ public TFetchSchemaTableDataResult
fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException {
+ switch (request.getSchemaTableName()) {
+ case BACKENDS:
+ return getBackendsSchemaTable(request);
+ default:
+ break;
+ }
+ TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+ result.setStatus(new TStatus(TStatusCode.INTERNAL_ERROR));
+ return result;
+ }
+
+ private TFetchSchemaTableDataResult
getBackendsSchemaTable(TFetchSchemaTableDataRequest request) {
+ final SystemInfoService clusterInfoService =
Env.getCurrentSystemInfo();
+ TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
+ List<Long> backendIds = null;
+ if (!Strings.isNullOrEmpty(request.cluster_name)) {
+ final Cluster cluster =
Env.getCurrentEnv().getCluster(request.cluster_name);
+ // root not in any cluster
+ if (null == cluster) {
+ return result;
+ }
+ backendIds = cluster.getBackendIdList();
+ } else {
+ backendIds = clusterInfoService.getBackendIds(false);
+ if (backendIds == null) {
+ return result;
+ }
+ }
+
+ long start = System.currentTimeMillis();
+ Stopwatch watch = Stopwatch.createUnstarted();
+
+ List<TRow> dataBatch = Lists.newArrayList();
+ for (long backendId : backendIds) {
+ Backend backend = clusterInfoService.getBackend(backendId);
+ if (backend == null) {
+ continue;
+ }
+
+ watch.start();
+ Integer tabletNum =
Env.getCurrentInvertedIndex().getTabletNumByBackendId(backendId);
+ watch.stop();
+
+ TRow trow = new TRow();
+ trow.addToColumnValue(new TCell().setLongVal(backendId));
+ trow.addToColumnValue(new
TCell().setStringVal(backend.getOwnerClusterName()));
+ trow.addToColumnValue(new TCell().setStringVal(backend.getHost()));
+ if (Strings.isNullOrEmpty(request.cluster_name)) {
+ trow.addToColumnValue(new
TCell().setIntVal(backend.getHeartbeatPort()));
+ trow.addToColumnValue(new
TCell().setIntVal(backend.getBePort()));
+ trow.addToColumnValue(new
TCell().setIntVal(backend.getHttpPort()));
+ trow.addToColumnValue(new
TCell().setIntVal(backend.getBrpcPort()));
+ }
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastStartTime())));
+ trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(backend.getLastUpdateMs())));
+ trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(backend.isAlive())));
+ if (backend.isDecommissioned() && backend.getDecommissionType() ==
DecommissionType.ClusterDecommission) {
+ trow.addToColumnValue(new TCell().setStringVal("false"));
+ trow.addToColumnValue(new TCell().setStringVal("true"));
+ } else if (backend.isDecommissioned()
+ && backend.getDecommissionType() ==
DecommissionType.SystemDecommission) {
+ trow.addToColumnValue(new TCell().setStringVal("true"));
+ trow.addToColumnValue(new TCell().setStringVal("false"));
+ } else {
+ trow.addToColumnValue(new TCell().setStringVal("false"));
+ trow.addToColumnValue(new TCell().setStringVal("false"));
+ }
+ trow.addToColumnValue(new TCell().setLongVal(tabletNum));
+
+ // capacity
+ // data used
+ trow.addToColumnValue(new
TCell().setLongVal(backend.getDataUsedCapacityB()));
+
+ // available
+ long availB = backend.getAvailableCapacityB();
+ trow.addToColumnValue(new TCell().setLongVal(availB));
+
+ // total
+ long totalB = backend.getTotalCapacityB();
+ trow.addToColumnValue(new TCell().setLongVal(totalB));
+
+ // used percent
+ double used = 0.0;
+ if (totalB <= 0) {
+ used = 0.0;
+ } else {
+ used = (double) (totalB - availB) * 100 / totalB;
+ }
+ trow.addToColumnValue(new TCell().setDoubleVal(used));
+ trow.addToColumnValue(new
TCell().setDoubleVal(backend.getMaxDiskUsedPct() * 100));
+
+ // remote used capacity
+ trow.addToColumnValue(new
TCell().setLongVal(backend.getRemoteUsedCapacityB()));
+
+ // tags
+ trow.addToColumnValue(new
TCell().setStringVal(backend.getTagMapString()));
+ // err msg
+ trow.addToColumnValue(new
TCell().setStringVal(backend.getHeartbeatErrMsg()));
+ // version
+ trow.addToColumnValue(new
TCell().setStringVal(backend.getVersion()));
+ // status
+ trow.addToColumnValue(new TCell().setStringVal(new
Gson().toJson(backend.getBackendStatus())));
+ dataBatch.add(trow);
+ }
+
+ // backends proc node get result too slow, add log to observer.
+ LOG.debug("backends proc get tablet num cost: {}, total cost: {}",
+ watch.elapsed(TimeUnit.MILLISECONDS),
(System.currentTimeMillis() - start));
+
+ result.setDataBatch(dataBatch);
+ result.setStatus(new TStatus(TStatusCode.OK));
+ return result;
+ }
+
private TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext =
ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
diff --git a/gensrc/thrift/Data.thrift b/gensrc/thrift/Data.thrift
index 676e22a0f2..fee713694f 100644
--- a/gensrc/thrift/Data.thrift
+++ b/gensrc/thrift/Data.thrift
@@ -47,17 +47,22 @@ struct TRowBatch {
}
// this is a union over all possible return types
-struct TColumnValue {
+struct TCell {
// TODO: use <type>_val instead of camelcase
1: optional bool boolVal
2: optional i32 intVal
3: optional i64 longVal
4: optional double doubleVal
5: optional string stringVal
+ // add type: date datetime
}
struct TResultRow {
- 1: list<TColumnValue> colVals
+ 1: list<TCell> colVals
+}
+
+struct TRow {
+ 1: optional list<TCell> column_value
}
// Serialized, self-contained version of a RowBatch (in
be/src/runtime/row-batch.h).
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index f9517e2bfa..a3fc84571b 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -103,7 +103,8 @@ enum TSchemaTableType {
SCH_VARIABLES,
SCH_VIEWS,
SCH_INVALID,
- SCH_ROWSETS
+ SCH_ROWSETS,
+ SCH_BACKENDS
}
enum THdfsCompression {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 5d48c1ae7e..9cf6ffb6ee 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -689,6 +689,20 @@ struct TInitExternalCtlMetaResult {
2: optional string status;
}
+enum TSchemaTableName{
+ BACKENDS = 0,
+}
+
+struct TFetchSchemaTableDataRequest {
+ 1: optional string cluster_name
+ 2: optional TSchemaTableName schema_table_name
+}
+
+struct TFetchSchemaTableDataResult {
+ 1: required Status.TStatus status
+ 2: optional list<Data.TRow> data_batch;
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -725,4 +739,6 @@ service FrontendService {
AgentService.TGetStoragePolicyResult refreshStoragePolicy()
TInitExternalCtlMetaResult initExternalCtlMeta(1:
TInitExternalCtlMetaRequest request)
+
+ TFetchSchemaTableDataResult fetchSchemaTableData(1:
TFetchSchemaTableDataRequest request)
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2df3fea6a2..f8566c2b5d 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -453,6 +453,13 @@ struct TCsvScanNode {
10:optional map<string, TMiniLoadEtlFunction> column_function_mapping
}
+struct TSchemaTableStructure {
+ 1: optional string column_name
+ 2: optional Types.TPrimitiveType type
+ 3: optional i64 len
+ 4: optional bool is_null;
+}
+
struct TSchemaScanNode {
1: required Types.TTupleId tuple_id
@@ -467,6 +474,7 @@ struct TSchemaScanNode {
10: optional string user_ip // deprecated
11: optional Types.TUserIdentity current_user_ident // to replace the user
and user_ip
12: optional bool show_hidden_cloumns = false
+ 13: optional list<TSchemaTableStructure> table_structure
}
struct TMetaScanNode {
diff --git a/regression-test/suites/correctness/test_backends_table.groovy
b/regression-test/suites/correctness/test_backends_table.groovy
new file mode 100644
index 0000000000..3b4771d825
--- /dev/null
+++ b/regression-test/suites/correctness/test_backends_table.groovy
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This suit test the `backends` information_schema table
+suite("test_backends_table") {
+ List<List<Object>> table = sql """ select * from
information_schema.backends; """
+ assertTrue(table.size() > 0) // row should > 0
+ assertTrue(table[0].size == 23) // column should be 23
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]