This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 063d717b775 [Bug](information-schema) fix some bug of
information_schema.PROCESSLIST #36409 (#36451)
063d717b775 is described below
commit 063d717b775c43f4355d2a728910dbaf062c6c07
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 18 21:41:39 2024 +0800
[Bug](information-schema) fix some bug of information_schema.PROCESSLIST
#36409 (#36451)
---
be/src/exec/schema_scanner.cpp | 61 ++++++++------------
.../schema_scanner/schema_processlist_scanner.cpp | 67 ++++++++++------------
.../java/org/apache/doris/catalog/SchemaTable.java | 24 ++++----
.../java/org/apache/doris/qe/ConnectContext.java | 2 +-
.../query_p0/system/test_query_sys_tables.groovy | 2 +
5 files changed, 72 insertions(+), 84 deletions(-)
diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp
index 75896f1ef66..dc57bfe8ace 100644
--- a/be/src/exec/schema_scanner.cpp
+++ b/be/src/exec/schema_scanner.cpp
@@ -62,15 +62,12 @@ namespace doris {
class ObjectPool;
SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns)
- : _is_init(false),
- _param(nullptr),
- _columns(columns),
- _schema_table_type(TSchemaTableType::SCH_INVALID) {}
+ : _is_init(false), _columns(columns),
_schema_table_type(TSchemaTableType::SCH_INVALID) {}
SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns,
TSchemaTableType::type type)
- : _is_init(false), _param(nullptr), _columns(columns),
_schema_table_type(type) {}
+ : _is_init(false), _columns(columns), _schema_table_type(type) {}
-SchemaScanner::~SchemaScanner() {}
+SchemaScanner::~SchemaScanner() = default;
Status SchemaScanner::start(RuntimeState* state) {
if (!_is_init) {
@@ -170,7 +167,7 @@ Status
SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_
size_t fill_num = datas.size();
col_ptr = &nullable_column->get_nested_column();
for (int i = 0; i < fill_num; ++i) {
- auto data = datas[i];
+ auto* data = datas[i];
if (data == nullptr) {
// For nested column need not insert default.
nullable_column->insert_data(nullptr, 0);
@@ -180,99 +177,89 @@ Status
SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_
}
switch (col_desc.type) {
case TYPE_HLL: {
- HyperLogLog* hll_slot = reinterpret_cast<HyperLogLog*>(data);
-
reinterpret_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot);
+ auto* hll_slot = reinterpret_cast<HyperLogLog*>(data);
+
assert_cast<vectorized::ColumnHLL*>(col_ptr)->get_data().emplace_back(*hll_slot);
break;
}
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
- StringRef* str_slot = reinterpret_cast<StringRef*>(data);
-
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data,
-
str_slot->size);
+ auto* str_slot = reinterpret_cast<StringRef*>(data);
+
assert_cast<vectorized::ColumnString*>(col_ptr)->insert_data(str_slot->data,
+
str_slot->size);
break;
}
case TYPE_BOOLEAN: {
uint8_t num = *reinterpret_cast<bool*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_TINYINT: {
int8_t num = *reinterpret_cast<int8_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int8>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_SMALLINT: {
int16_t num = *reinterpret_cast<int16_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int16>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_INT: {
int32_t num = *reinterpret_cast<int32_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_BIGINT: {
int64_t num = *reinterpret_cast<int64_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_LARGEINT: {
__int128 num;
memcpy(&num, data, sizeof(__int128));
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Int128>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_FLOAT: {
float num = *reinterpret_cast<float*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Float32>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_DOUBLE: {
double num = *reinterpret_cast<double*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(
- num);
+
assert_cast<vectorized::ColumnVector<vectorized::Float64>*>(col_ptr)->insert_value(num);
break;
}
case TYPE_DATE: {
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
+
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
break;
}
case TYPE_DATEV2: {
uint32_t num = *reinterpret_cast<uint32_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt32>*>(col_ptr)->insert_value(
- num);
+ assert_cast<vectorized::ColumnDateV2*>(col_ptr)->insert_value(num);
break;
}
case TYPE_DATETIME: {
-
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
+
assert_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
break;
}
case TYPE_DATETIMEV2: {
- uint32_t num = *reinterpret_cast<uint64_t*>(data);
-
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt64>*>(col_ptr)->insert_value(
- num);
+ uint64_t num = *reinterpret_cast<uint64_t*>(data);
+
assert_cast<vectorized::ColumnDateTimeV2*>(col_ptr)->insert_value(num);
break;
}
@@ -291,14 +278,14 @@ Status
SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_
case TYPE_DECIMAL32: {
const int32_t num = *reinterpret_cast<int32_t*>(data);
-
reinterpret_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
+ assert_cast<vectorized::ColumnDecimal32*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}
case TYPE_DECIMAL64: {
const int64_t num = *reinterpret_cast<int64_t*>(data);
-
reinterpret_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
+ assert_cast<vectorized::ColumnDecimal64*>(col_ptr)->insert_data(
reinterpret_cast<const char*>(&num), 0);
break;
}
diff --git a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
index d9cbd0f3fb7..964f05db9d8 100644
--- a/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_processlist_scanner.cpp
@@ -19,9 +19,11 @@
#include <gen_cpp/FrontendService_types.h>
+#include <exception>
#include <vector>
#include "exec/schema_scanner/schema_helper.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/runtime_state.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
@@ -30,15 +32,19 @@
namespace doris {
std::vector<SchemaScanner::ColumnDesc>
SchemaProcessListScanner::_s_processlist_columns = {
+ {"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
{"ID", TYPE_LARGEINT, sizeof(int128_t), false},
{"USER", TYPE_VARCHAR, sizeof(StringRef), false},
{"HOST", TYPE_VARCHAR, sizeof(StringRef), false},
+ {"LOGIN_TIME", TYPE_DATETIMEV2,
sizeof(vectorized::DateTimeV2ValueType), false},
{"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false},
{"DB", TYPE_VARCHAR, sizeof(StringRef), false},
{"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false},
{"TIME", TYPE_INT, sizeof(int32_t), false},
{"STATE", TYPE_VARCHAR, sizeof(StringRef), false},
- {"INFO", TYPE_VARCHAR, sizeof(StringRef), false}};
+ {"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
+ {"INFO", TYPE_VARCHAR, sizeof(StringRef), false},
+ {"FE", TYPE_VARCHAR, sizeof(StringRef), false}};
SchemaProcessListScanner::SchemaProcessListScanner()
: SchemaScanner(_s_processlist_columns,
TSchemaTableType::SCH_PROCESSLIST) {}
@@ -89,48 +95,37 @@ Status
SchemaProcessListScanner::_fill_block_impl(vectorized::Block* block) {
for (size_t row_idx = 0; row_idx < row_num; ++row_idx) {
const auto& row = process_list[row_idx];
+ if (row.size() != _s_processlist_columns.size()) {
+ return Status::InternalError(
+ "process list meet invalid schema, schema_size={},
input_data_size={}",
+ _s_processlist_columns.size(), row.size());
+ }
// Fetch and store the column value based on its index
std::string& column_value =
column_values[row_idx]; // Reference to the actual string
in the vector
-
- switch (col_idx) {
- case 0:
- column_value = row.size() > 1 ? row[1] : "";
- break; // ID
- case 1:
- column_value = row.size() > 2 ? row[2] : "";
- break; // USER
- case 2:
- column_value = row.size() > 3 ? row[3] : "";
- break; // HOST
- case 3:
- column_value = row.size() > 5 ? row[5] : "";
- break; // CATALOG
- case 4:
- column_value = row.size() > 6 ? row[6] : "";
- break; // DB
- case 5:
- column_value = row.size() > 7 ? row[7] : "";
- break; // COMMAND
- case 6:
- column_value = row.size() > 8 ? row[8] : "";
- break; // TIME
- case 7:
- column_value = row.size() > 9 ? row[9] : "";
- break; // STATE
- case 8:
- column_value = row.size() > 11 ? row[11] : "";
- break; // INFO
- default:
- column_value = "";
- break;
- }
+ column_value = row[col_idx];
if (_s_processlist_columns[col_idx].type == TYPE_LARGEINT ||
_s_processlist_columns[col_idx].type == TYPE_INT) {
- int128_t val = !column_value.empty() ?
std::stoll(column_value) : 0;
- int_vals[row_idx] = val;
+ try {
+ int128_t val = !column_value.empty() ?
std::stoll(column_value) : 0;
+ int_vals[row_idx] = val;
+ } catch (const std::exception& e) {
+ return Status::InternalError(
+ "process list meet invalid data, column={},
data={}, reason={}",
+ _s_processlist_columns[col_idx].name,
column_value, e.what());
+ }
+ datas[row_idx] = &int_vals[row_idx];
+ } else if (_s_processlist_columns[col_idx].type ==
TYPE_DATETIMEV2) {
+ auto* dv =
+
reinterpret_cast<vectorized::DateV2Value<vectorized::DateTimeV2ValueType>*>(
+ &int_vals[row_idx]);
+ if (!dv->from_date_str(column_value.data(),
column_value.size(), -1)) {
+ return Status::InternalError(
+ "process list meet invalid data, column={},
data={}, reason={}",
+ _s_processlist_columns[col_idx].name,
column_value);
+ }
datas[row_idx] = &int_vals[row_idx];
} else {
str_refs[row_idx] =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 164495c4896..74d59b4cd83 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -446,16 +446,20 @@ public class SchemaTable extends Table {
.column("SOURCE_LINE",
ScalarType.createType(PrimitiveType.INT))
.build()))
.put("processlist", new SchemaTable(SystemIdGenerator.getNextId(),
"processlist", TableType.SCHEMA,
- builder().column("ID",
ScalarType.createType(PrimitiveType.LARGEINT))
- .column("USER", ScalarType.createVarchar(32))
- .column("HOST", ScalarType.createVarchar(261))
- .column("CATALOG", ScalarType.createVarchar(64))
- .column("DB", ScalarType.createVarchar(64))
- .column("COMMAND", ScalarType.createVarchar(16))
- .column("TIME",
ScalarType.createType(PrimitiveType.INT))
- .column("STATE", ScalarType.createVarchar(64))
- .column("INFO",
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
- .build()))
+ builder().column("CURRENT_CONNECTED",
ScalarType.createVarchar(16))
+ .column("ID",
ScalarType.createType(PrimitiveType.LARGEINT))
+ .column("USER", ScalarType.createVarchar(32))
+ .column("HOST", ScalarType.createVarchar(261))
+ .column("LOGIN_TIME",
ScalarType.createType(PrimitiveType.DATETIMEV2))
+ .column("CATALOG", ScalarType.createVarchar(64))
+ .column("DB", ScalarType.createVarchar(64))
+ .column("COMMAND", ScalarType.createVarchar(16))
+ .column("TIME", ScalarType.createType(PrimitiveType.INT))
+ .column("STATE", ScalarType.createVarchar(64))
+ .column("QUERY_ID", ScalarType.createVarchar(256))
+ .column("INFO",
ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
+ .column("FE",
+ ScalarType.createVarchar(64)).build()))
.build();
protected SchemaTable(long id, String name, TableType type, List<Column>
baseSchema) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
index 2840021aea9..508691f29a3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java
@@ -830,7 +830,7 @@ public class ConnectContext {
if (connId == connectionId) {
row.add("Yes");
} else {
- row.add("");
+ row.add("No");
}
row.add("" + connectionId);
row.add(ClusterNamespace.getNameFromFullName(qualifiedUser));
diff --git
a/regression-test/suites/query_p0/system/test_query_sys_tables.groovy
b/regression-test/suites/query_p0/system/test_query_sys_tables.groovy
index 7d943894168..fcc69bdadf2 100644
--- a/regression-test/suites/query_p0/system/test_query_sys_tables.groovy
+++ b/regression-test/suites/query_p0/system/test_query_sys_tables.groovy
@@ -280,4 +280,6 @@ suite("test_query_sys_tables", "query,p0") {
qt_sql "select * from triggers"
qt_sql "select * from parameters"
qt_sql "select * from profiling"
+
+ sql "select * from information_schema.PROCESSLIST;"
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]