This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new dda25cceb6a [Bug](information-schema) fix some bug of
information_schema.PROCESSLIST (#36447)
dda25cceb6a is described below
commit dda25cceb6adeba380e7327d67dc0824a240c9ae
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 18 16:45:48 2024 +0800
[Bug](information-schema) fix some bug of information_schema.PROCESSLIST
(#36447)
## Proposed changes
pick from #36409
---
be/src/exec/schema_scanner.cpp | 65 +++++++++------------
.../schema_scanner/schema_processlist_scanner.cpp | 66 ++++++++++------------
be/src/pipeline/exec/schema_scan_operator.cpp | 27 ++++-----
.../java/org/apache/doris/catalog/SchemaTable.java | 27 +++++----
.../java/org/apache/doris/qe/ConnectContext.java | 2 +-
.../info_schema_db/test_info_schema_db.groovy | 2 +
6 files changed, 87 insertions(+), 102 deletions(-)
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index c233c6f83fd..2b6b2c1f3c0 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -69,15 +69,12 @@ namespace doris {
class ObjectPool;
SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns)
- : _is_init(false),
- _param(nullptr),
- _columns(columns),
- _schema_table_type(TSchemaTableType::SCH_INVALID) {}
+ : _is_init(false), _columns(columns),
_schema_table_type(TSchemaTableType::SCH_INVALID) {}
SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns,
TSchemaTableType::type type)
- : _is_init(false), _param(nullptr), _columns(columns),
_schema_table_type(type) {}
+ : _is_init(false), _columns(columns), _schema_table_type(type) {}
-SchemaScanner::~SchemaScanner() {}
+SchemaScanner::~SchemaScanner() = default;
Status SchemaScanner::start(RuntimeState* state) {
if (!_is_init) {
@@ -189,7 +186,7 @@ Status
SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_
size_t fill_num = datas.size();
col_ptr = &nullable_column->get_nested_column();
for (int i = 0; i < fill_num; ++i) {
- auto data = datas[i];
+ auto* data = datas[i];
if (data == nullptr) {
// For nested column need not insert default.
nullable_column->insert_data(nullptr, 0);
@@ -199,125 +196,115 @@ Status
SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_
}
switch (col_desc.type) {
case TYPE_HLL: {
- HyperLogLog* hll_slot = reinterpret_cast<HyperLogLog*>(data);
-
reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot);
+ auto* hll_slot = reinterpret_cast<HyperLogLog*>(data);
+
assert_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot);
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
- StringRef* str_slot = reinterpret_cast<StringRef*>(data);
-
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data,
-
str_slot->size);
+ auto* str_slot = reinterpret_cast<StringRef*>(data);
+
assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data,
+
str_slot->size);
break;
}
case TYPE_BOOLEAN: {
uint8_t num = *reinterpret_cast<bool*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_TINYINT: {
int8_t num = *reinterpret_cast<int8_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_SMALLINT: {
int16_t num = *reinterpret_cast<int16_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_INT: {
int32_t num = *reinterpret_cast<int32_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_BIGINT: {
int64_t num = *reinterpret_cast<int64_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_LARGEINT: {
__int128 num;
memcpy(&num, data, sizeof(__int128));
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_FLOAT: {
float num = *reinterpret_cast<float*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_DOUBLE: {
double num = *reinterpret_cast<double*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_DATE: {
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
+
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
break;
}
case TYPE_DATEV2: {
uint32_t num = *reinterpret_cast<uint32_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)->insert_value(
- num);
+ assert_cast<vectorized::ColumnDateV2*>(col_ptr)->insert_value(num);
break;
}
case TYPE_DATETIME: {
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
+
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
break;
}
case TYPE_DATETIMEV2: {
- uint32_t num = *reinterpret_cast<uint64_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)->insert_value(
- num);
+ uint64_t num = *reinterpret_cast<uint64_t*>(data);
+
assert_cast<vectorized::ColumnDateTimeV2*>(col_ptr)->insert_value(num);
break;
}
case TYPE_DECIMALV2: {
const vectorized::Int128 num =
(reinterpret_cast<PackedInt128*>(data))->value;
-
reinterpret_cast<vectorized::ColumnDecimal128V2*>(col_ptr)->insert_data(
+ assert_cast<vectorized::ColumnDecimal128V2*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}
case TYPE_DECIMAL128I: {
const vectorized::Int128 num =
(reinterpret_cast<PackedInt128*>(data))->value;
-
reinterpret_cast<vectorized::ColumnDecimal128V3*>(col_ptr)->insert_data(
+ assert_cast<vectorized::ColumnDecimal128V3*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}
case TYPE_DECIMAL32: {
const int32_t num = *reinterpret_cast<int32_t*>(data);
-
reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
+ assert_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}
case TYPE_DECIMAL64: {
const int64_t num = *reinterpret_cast<int64_t*>(data);
-
reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
+ assert_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index 2ecc2be9e01..f5f5bc23634 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -19,9 +19,11 @@
#include <gen_cpp/FrontendService_types.h>
+#include <exception>
#include <vector>
#include "exec/schema_scanner/schema_helper.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/runtime_state.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
@@ -30,15 +32,19 @@
namespace doris {
std::vector<SchemaScanner::ColumnDesc>
SchemaProcessListScanner::_s_processlist_columns = {
+ {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
{"ID", TYPE_LARGEINT, sizeof(int128_t), false},
{"USER", TYPE_VARCHAR, sizeof(StringRef), false},
{"HOST", TYPE_VARCHAR, sizeof(StringRef), false},
+ {"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false},
{"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false},
{"DB", TYPE_VARCHAR, sizeof(StringRef), false},
{"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false},
{"TIME", TYPE_INT, sizeof(int32_t), false},
{"STATE", TYPE_VARCHAR, sizeof(StringRef), false},
- {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}};
+ {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
+ {"INFO", TYPE_VARCHAR, sizeof(StringRef), false},
+ {"FE", TYPE_VARCHAR, sizeof(StringRef), false}};
SchemaProcessListScanner::SchemaProcessListScanner()
: SchemaScanner(_s_processlist_columns,
TSchemaTableType::SCH_PROCESSLIST) {}
@@ -90,48 +96,36 @@ Status
SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) {
for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
const auto& row = process_list[row_idx];
+ if (row.size() != _s_processlist_columns.size()) {
+ return Status::InternalError(
+ "process list meet invalid schema, schema_size={},
input_data_size={}",
+ _s_processlist_columns.size(), row.size());
+ }
// Fetch and store the column value based on its index
std::string& column_value =
column_values[row_idx]; // Reference to the actual string
in the vector
-
- switch (col_idx) {
- case 0:
- column_value = row.size() > 1 ? row[1] : "";
- break; // ID
- case 1:
- column_value = row.size() > 2 ? row[2] : "";
- break; // USER
- case 2:
- column_value = row.size() > 3 ? row[3] : "";
- break; // HOST
- case 3:
- column_value = row.size() > 5 ? row[5] : "";
- break; // CATALOG
- case 4:
- column_value = row.size() > 6 ? row[6] : "";
- break; // DB
- case 5:
- column_value = row.size() > 7 ? row[7] : "";
- break; // COMMAND
- case 6:
- column_value = row.size() > 8 ? row[8] : "";
- break; // TIME
- case 7:
- column_value = row.size() > 9 ? row[9] : "";
- break; // STATE
- case 8:
- column_value = row.size() > 11 ? row[11] : "";
- break; // INFO
- default:
- column_value = "";
- break;
- }
+ column_value = row[col_idx];
if (_s_processlist_columns[col_idx].type == TYPE_LARGEINT ||
_s_processlist_columns[col_idx].type == TYPE_INT) {
- int128_t val = !column_value.empty() ?
std::stoll(column_value) : 0;
- int_vals[row_idx] = val;
+ try {
+ int128_t val = !column_value.empty() ?
std::stoll(column_value) : 0;
+ int_vals[row_idx] = val;
+ } catch (const std::exception& e) {
+ return Status::InternalError(
+ "process list meet invalid data, column={},
data={}, reason={}",
+ _s_processlist_columns[col_idx].name,
column_value, e.what());
+ }
+ datas[row_idx] = &int_vals[row_idx];
+ } else if (_s_processlist_columns[col_idx].type ==
TYPE_DATETIMEV2) {
+ auto* dv =
reinterpret_cast<DateV2Value<DateTimeV2ValueType>*>(&int_vals[row_idx]);
+ if (!dv->from_date_str(column_value.data(),
column_value.size(), -1,
+ config::allow_zero_date)) {
+ return Status::InternalError(
+ "process list meet invalid data, column={},
data={}, reason={}",
+ _s_processlist_columns[col_idx].name,
column_value);
+ }
datas[row_idx] = &int_vals[row_idx];
} else {
str_refs[row_idx] =
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp
b/be/src/pipeline/exec/schema_scan_operator.cpp
index 2d32e21d991..f26b2d706b7 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -52,11 +52,11 @@ Status SchemaScanLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
auto& p = _parent->cast<SchemaScanOperatorX>();
_scanner_param.common_param = p._common_scanner_param;
// init schema scanner profile
- _scanner_param.profile.reset(new RuntimeProfile("SchemaScanner"));
+ _scanner_param.profile = std::make_unique<RuntimeProfile>("SchemaScanner");
profile()->add_child(_scanner_param.profile.get(), true, nullptr);
// get src tuple desc
- const SchemaTableDescriptor* schema_table =
+ const auto* schema_table =
static_cast<const
SchemaTableDescriptor*>(p._dest_tuple_desc->table_desc());
// new one scanner
_schema_scanner = SchemaScanner::create(schema_table->schema_table_type());
@@ -81,7 +81,6 @@ SchemaScanOperatorX::SchemaScanOperatorX(ObjectPool* pool,
const TPlanNode& tnod
_table_name(tnode.schema_scan_node.table_name),
_common_scanner_param(new SchemaScannerCommonParam()),
_tuple_id(tnode.schema_scan_node.tuple_id),
- _dest_tuple_desc(nullptr),
_tuple_idx(0),
_slot_num(0) {}
@@ -162,7 +161,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
_slot_num = _dest_tuple_desc->slots().size();
// get src tuple desc
- const SchemaTableDescriptor* schema_table =
+ const auto* schema_table =
static_cast<const
SchemaTableDescriptor*>(_dest_tuple_desc->table_desc());
if (nullptr == schema_table) {
@@ -179,7 +178,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
const std::vector<SchemaScanner::ColumnDesc>&
columns_desc(_schema_scanner->get_column_desc());
// if src columns size is zero, it's the dummy slots.
- if (0 == columns_desc.size()) {
+ if (columns_desc.empty()) {
_slot_num = 0;
}
@@ -193,17 +192,15 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
}
if (j >= columns_desc.size()) {
- LOG(WARNING) << "no match column for this column("
- << _dest_tuple_desc->slots()[i]->col_name() << ")";
- return Status::InternalError("no match column for this column.");
+ return Status::InternalError("no match column for this column({})",
+
_dest_tuple_desc->slots()[i]->col_name());
}
if (columns_desc[j].type != _dest_tuple_desc->slots()[i]->type().type)
{
- LOG(WARNING) << "schema not match. input is " <<
columns_desc[j].name << "("
- << columns_desc[j].type << ") and output is "
- << _dest_tuple_desc->slots()[i]->col_name() << "("
- << _dest_tuple_desc->slots()[i]->type() << ")";
- return Status::InternalError("schema not match.");
+ return Status::InternalError("schema not match. input is {}({})
and output is {}({})",
+ columns_desc[j].name,
type_to_string(columns_desc[j].type),
+
_dest_tuple_desc->slots()[i]->col_name(),
+
type_to_string(_dest_tuple_desc->slots()[i]->type().type));
}
}
@@ -224,7 +221,7 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state,
vectorized::Block* bl
do {
block->clear();
for (int i = 0; i < _slot_num; ++i) {
- auto dest_slot_desc = _dest_tuple_desc->slots()[i];
+ auto* dest_slot_desc = _dest_tuple_desc->slots()[i];
block->insert(vectorized::ColumnWithTypeAndName(
dest_slot_desc->get_empty_mutable_column(),
dest_slot_desc->get_data_type_ptr(),
dest_slot_desc->col_name()));
@@ -258,7 +255,7 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state,
vectorized::Block* bl
if (src_block.rows()) {
// block->check_number_of_rows();
for (int i = 0; i < _slot_num; ++i) {
- auto dest_slot_desc = _dest_tuple_desc->slots()[i];
+ auto* dest_slot_desc = _dest_tuple_desc->slots()[i];
vectorized::MutableColumnPtr column_ptr =
std::move(*block->get_by_position(i).column).mutate();
column_ptr->insert_range_from(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 92206dc3fdb..0995884dc6f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -489,17 +489,22 @@ public class SchemaTable extends Table {
.column("SPILL_THRESHOLD_HIGH_WATERMARK",
ScalarType.createVarchar(256))
.column("TAG", ScalarType.createVarchar(256))
.build()))
- .put("processlist", new SchemaTable(SystemIdGenerator.getNextId(),
"processlist", TableType.SCHEMA,
- builder().column("ID",
ScalarType.createType(PrimitiveType.LARGEINT))
- .column("USER", ScalarType.createVarchar(32))
- .column("HOST", ScalarType.createVarchar(261))
- .column("CATALOG", ScalarType.createVarchar(64))
- .column("DB", ScalarType.createVarchar(64))
- .column("COMMAND", ScalarType.createVarchar(16))
- .column("TIME",
ScalarType.createType(PrimitiveType.INT))
- .column("STATE", ScalarType.createVarchar(64))
- .column("INFO",
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
- .build()))
+ .put("processlist",
+ new SchemaTable(SystemIdGenerator.getNextId(),
"processlist", TableType.SCHEMA,
+ builder().column("CURRENT_CONNECTED",
ScalarType.createVarchar(16))
+ .column("ID",
ScalarType.createType(PrimitiveType.LARGEINT))
+ .column("USER",
ScalarType.createVarchar(32))
+ .column("HOST",
ScalarType.createVarchar(261))
+ .column("LOGIN_TIME",
ScalarType.createType(PrimitiveType.DATETIMEV2))
+ .column("CATALOG",
ScalarType.createVarchar(64))
+ .column("DB", ScalarType.createVarchar(64))
+ .column("COMMAND",
ScalarType.createVarchar(16))
+ .column("TIME",
ScalarType.createType(PrimitiveType.INT))
+ .column("STATE",
ScalarType.createVarchar(64))
+ .column("QUERY_ID",
ScalarType.createVarchar(256))
+ .column("INFO",
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
+ .column("FE",
+
ScalarType.createVarchar(64)).build()))
.put("workload_policy",
new SchemaTable(SystemIdGenerator.getNextId(),
"workload_policy", TableType.SCHEMA,
builder().column("ID",
ScalarType.createType(PrimitiveType.BIGINT))
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index ef6d3b9ea7c..48a79bd9b07 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -1023,7 +1023,7 @@ public class ConnectContext {
if (connId == connectionId) {
row.add("Yes");
} else {
- row.add("");
+ row.add("No");
}
row.add("" + connectionId);
row.add(ClusterNamespace.getNameFromFullName(qualifiedUser));
diff --git
a/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy
b/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy
index ac85abe8480..1ad8fc40687 100644
---
a/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy
+++
b/regression-test/suites/external_table_p0/info_schema_db/test_info_schema_db.groovy
@@ -131,4 +131,6 @@ suite("test_info_schema_db",
"p0,external,hive,external_docker,external_docker_h
qt_sql116 """select table_catalog, table_schema, table_name from
information_schema.tables where table_schema='${innerdb}'"""
qt_sql117 """select table_catalog, table_schema, table_name from
${catalog_name}.information_schema.columns where table_schema='tpch1_parquet'"""
qt_sql118 """select table_catalog, table_schema, table_name from
${catalog_name}.INFORMATION_SCHEMA.COLUMNS where TABLE_SCHEMA='tpch1_parquet'"""
+
+ sql "select * from information_schema.PROCESSLIST;"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]