This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 30b51a54f9b branch-4.0: [refactor](mysql output)Use to_string when
outputting plain text to MySQL. #57824 (#58264)
30b51a54f9b is described below
commit 30b51a54f9bf3242ddd3d94a1d4589173743bfc1
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 24 16:13:00 2025 +0800
branch-4.0: [refactor](mysql output)Use to_string when outputting plain
text to MySQL. #57824 (#58264)
Cherry-picked from #57824
Co-authored-by: Mryange <[email protected]>
---
.../data_types/serde/data_type_bitmap_serde.cpp | 15 ++++
.../vec/data_types/serde/data_type_bitmap_serde.h | 3 +
.../vec/data_types/serde/data_type_hll_serde.cpp | 15 ++++
be/src/vec/data_types/serde/data_type_hll_serde.h | 3 +
.../vec/data_types/serde/data_type_jsonb_serde.cpp | 6 ++
.../data_types/serde/data_type_nullable_serde.cpp | 10 +++
.../data_types/serde/data_type_nullable_serde.h | 2 +
.../data_types/serde/data_type_number_serde.cpp | 12 +++-
.../serde/data_type_quantilestate_serde.cpp | 18 +++++
.../serde/data_type_quantilestate_serde.h | 3 +
be/src/vec/data_types/serde/data_type_serde.cpp | 6 ++
be/src/vec/data_types/serde/data_type_serde.h | 5 ++
be/src/vec/sink/vmysql_result_writer.cpp | 84 +++++++++++++++++++---
13 files changed, 170 insertions(+), 12 deletions(-)
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
index 525fc24451a..93e2f48e59d 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
@@ -173,6 +173,21 @@ Status DataTypeBitMapSerDe::_write_column_to_mysql(const
IColumn& column,
return Status::OK();
}
+bool DataTypeBitMapSerDe::write_column_to_mysql_text(const IColumn& column,
BufferWritable& bw,
+ int64_t row_idx) const {
+ const auto& data_column = assert_cast<const ColumnBitmap&>(column);
+ if (_return_object_as_string) {
+ BitmapValue bitmap_value = data_column.get_element(row_idx);
+ size_t size = bitmap_value.getSizeInBytes();
+ std::unique_ptr<char[]> buf =
std::make_unique_for_overwrite<char[]>(size);
+ bitmap_value.write_to(buf.get());
+ bw.write(buf.get(), size);
+ return true;
+ } else {
+ return false;
+ }
+}
+
Status DataTypeBitMapSerDe::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer&
row_buffer,
int64_t row_idx, bool
col_const,
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
index a459a1a93bd..482665aaaa1 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
@@ -78,6 +78,9 @@ public:
int64_t row_idx, bool col_const,
const FormatOptions& options) const
override;
+ bool write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
+ int64_t row_idx) const override;
+
Status write_column_to_orc(const std::string& timezone, const IColumn&
column,
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch,
int64_t start, int64_t end, vectorized::Arena&
arena) const override;
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
index 9c2c0ef430f..0ccfe06257b 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
@@ -170,6 +170,21 @@ Status DataTypeHLLSerDe::_write_column_to_mysql(const
IColumn& column,
return Status::OK();
}
+bool DataTypeHLLSerDe::write_column_to_mysql_text(const IColumn& column,
BufferWritable& bw,
+ int64_t row_idx) const {
+ const auto& data_column = assert_cast<const ColumnHLL&>(column);
+ if (_return_object_as_string) {
+ const HyperLogLog& hyperLogLog = data_column.get_element(row_idx);
+ size_t size = hyperLogLog.max_serialized_size();
+ std::unique_ptr<char[]> buf =
std::make_unique_for_overwrite<char[]>(size);
+ hyperLogLog.serialize((uint8_t*)buf.get());
+ bw.write(buf.get(), size);
+ return true;
+ } else {
+ return false;
+ }
+}
+
Status DataTypeHLLSerDe::write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer&
row_buffer,
int64_t row_idx, bool
col_const,
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h
b/be/src/vec/data_types/serde/data_type_hll_serde.h
index 25ffbbef3b0..574967b4938 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.h
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.h
@@ -72,6 +72,9 @@ public:
int64_t row_idx, bool col_const,
const FormatOptions& options) const
override;
+ bool write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
+ int64_t row_idx) const override;
+
Status write_column_to_orc(const std::string& timezone, const IColumn&
column,
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch,
int64_t start, int64_t end, vectorized::Arena&
arena) const override;
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
index e26747fdfdd..609bab36e03 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
@@ -355,8 +355,14 @@ void DataTypeJsonbSerDe::to_string(const IColumn& column,
size_t row_num,
const auto& col = assert_cast<const ColumnString&,
TypeCheckOnRelease::DISABLE>(column);
const auto& data_ref = col.get_data_at(row_num);
if (data_ref.size > 0) {
+ if (_nesting_level > 1) {
+ bw.write('"');
+ }
std::string str = JsonbToJson::jsonb_to_json_string(data_ref.data,
data_ref.size);
bw.write(str.c_str(), str.size());
+ if (_nesting_level > 1) {
+ bw.write('"');
+ }
} else {
bw.write("NULL", 4);
}
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index 985db532a53..6e9b29c918b 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -353,6 +353,16 @@ Status
DataTypeNullableSerDe::read_column_from_arrow(IColumn& column,
ctz);
}
+bool DataTypeNullableSerDe::write_column_to_mysql_text(const IColumn& column,
BufferWritable& bw,
+ int64_t row_idx) const {
+ if (column.is_null_at(row_idx)) {
+ return false;
+ } else {
+ const auto& col = assert_cast<const ColumnNullable&>(column);
+ return
nested_serde->write_column_to_mysql_text(col.get_nested_column(), bw, row_idx);
+ }
+}
+
template <bool is_binary_format>
Status DataTypeNullableSerDe::_write_column_to_mysql(const IColumn& column,
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h
b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 16bf509e649..e85bea68a25 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -91,6 +91,8 @@ public:
Status write_column_to_mysql_text(const IColumn& column,
MysqlRowTextBuffer& row_buffer,
int64_t row_idx, bool col_const,
const FormatOptions& options) const
override;
+ bool write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
+ int64_t row_idx) const override;
Status write_column_to_orc(const std::string& timezone, const IColumn&
column,
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch,
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index d35f2063512..753c863e8d4 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -918,7 +918,17 @@ template <PrimitiveType T>
void DataTypeNumberSerDe<T>::to_string(const IColumn& column, size_t row_num,
BufferWritable& bw) const {
auto& data = assert_cast<const ColumnType&,
TypeCheckOnRelease::DISABLE>(column).get_data();
- value_to_string<T>(data[row_num], bw, get_scale());
+ if constexpr (is_date_type(T) || is_time_type(T) || is_ip(T)) {
+ if (_nesting_level > 1) {
+ bw.write('"');
+ }
+ value_to_string<T>(data[row_num], bw, get_scale());
+ if (_nesting_level > 1) {
+ bw.write('"');
+ }
+ } else {
+ value_to_string<T>(data[row_num], bw, get_scale());
+ }
}
template <PrimitiveType T>
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp
index 08d136aac9d..b6a1738fca0 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp
@@ -43,4 +43,22 @@ void
DataTypeQuantileStateSerDe::read_one_cell_from_jsonb(IColumn& column,
val.deserialize(Slice(blob->getBlob(), blob->getBlobLen()));
col.insert_value(val);
}
+
+bool DataTypeQuantileStateSerDe::write_column_to_mysql_text(const IColumn&
column,
+ BufferWritable& bw,
+ int64_t row_idx)
const {
+ const auto& data_column = reinterpret_cast<const
ColumnQuantileState&>(column);
+
+ if (_return_object_as_string) {
+ const auto& quantile_value = data_column.get_element(row_idx);
+ size_t size = quantile_value.get_serialized_size();
+ std::unique_ptr<char[]> buf =
std::make_unique_for_overwrite<char[]>(size);
+ quantile_value.serialize((uint8_t*)buf.get());
+ bw.write(buf.get(), size);
+ return true;
+ } else {
+ return false;
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
index 9e4492035d4..1b7c8d455eb 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
@@ -136,6 +136,9 @@ public:
return _write_column_to_mysql(column, row_buffer, row_idx, col_const,
options);
}
+ bool write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
+ int64_t row_idx) const override;
+
Status write_column_to_orc(const std::string& timezone, const IColumn&
column,
const NullMap* null_map,
orc::ColumnVectorBatch* orc_col_batch,
int64_t start, int64_t end,
diff --git a/be/src/vec/data_types/serde/data_type_serde.cpp
b/be/src/vec/data_types/serde/data_type_serde.cpp
index 0b1288895fa..1eb86a78ea2 100644
--- a/be/src/vec/data_types/serde/data_type_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_serde.cpp
@@ -135,6 +135,12 @@ void DataTypeSerDe::to_string(const IColumn& column,
size_t row_num, BufferWrita
"Data type {} to_string_batch not implement.",
get_name());
}
+bool DataTypeSerDe::write_column_to_mysql_text(const IColumn& column,
BufferWritable& bw,
+ int64_t row_idx) const {
+ to_string(column, row_idx, bw);
+ return true;
+}
+
const std::string DataTypeSerDe::NULL_IN_COMPLEX_TYPE = "null";
const std::string DataTypeSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE = "\\N";
diff --git a/be/src/vec/data_types/serde/data_type_serde.h
b/be/src/vec/data_types/serde/data_type_serde.h
index e4cd4eaa041..03a71368cb4 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -431,6 +431,11 @@ public:
int64_t row_idx, bool col_const,
const FormatOptions& options)
const = 0;
+ // return true if output as string
+ // return false if output null
+ virtual bool write_column_to_mysql_text(const IColumn& column,
BufferWritable& bw,
+ int64_t row_idx) const;
+
virtual Status write_column_to_mysql_binary(const IColumn& column,
MysqlRowBinaryBuffer&
row_buffer, int64_t row_idx,
bool col_const,
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp
b/be/src/vec/sink/vmysql_result_writer.cpp
index b20d260d400..fad638b0374 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -36,6 +36,7 @@
#include "runtime/result_block_buffer.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
+#include "util/mysql_global.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
@@ -174,6 +175,42 @@ Status VMysqlResultWriter<is_binary_format>::_set_options(
return Status::OK();
}
+void direct_write_to_mysql_result_string(std::string& mysql_rows, const char*
str, size_t size) {
+ // MySQL protocol length encoding:
+ // <= 250: 1 byte for length
+ // < 65536: 1 byte (252) + 2 bytes for length
+ // < 16777216: 1 byte (253) + 3 bytes for length
+ // >= 16777216: 1 byte (254) + 8 bytes for length
+
+ char buf[16];
+ if (size < 251ULL) {
+ int1store(buf, size);
+ mysql_rows.append(buf, 1);
+ } else if (size < 65536ULL) {
+ buf[0] = static_cast<char>(252);
+ uint16_t temp16 = static_cast<uint16_t>(size);
+ memcpy(buf + 1, &temp16, sizeof(temp16));
+ mysql_rows.append(buf, 3);
+ } else if (size < 16777216ULL) {
+ buf[0] = static_cast<char>(253);
+ int3store(buf + 1, size);
+ mysql_rows.append(buf, 4);
+ } else {
+ buf[0] = static_cast<char>(254);
+ uint64_t temp64 = static_cast<uint64_t>(size);
+ memcpy(buf + 1, &temp64, sizeof(temp64));
+ mysql_rows.append(buf, 9);
+ }
+
+ // Append string content
+ mysql_rows.append(str, size);
+}
+
+void direct_write_to_mysql_result_null(std::string& mysql_rows) {
+ // MySQL protocol for NULL value is a single byte with value 251
+ mysql_rows.push_back(static_cast<char>(251));
+}
+
template <bool is_binary_format>
Status VMysqlResultWriter<is_binary_format>::_write_one_block(RuntimeState*
state, Block& block) {
Status status = Status::OK();
@@ -233,19 +270,44 @@ Status
VMysqlResultWriter<is_binary_format>::_write_one_block(RuntimeState* stat
}
}
- for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
- for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
-
RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql(
- *(arguments[col_idx].column), row_buffer, row_idx,
- arguments[col_idx].is_const, _options));
+ const auto& serde_dialect = state->query_options().serde_dialect;
+
+ auto mysql_output_tmp_col = ColumnString::create();
+ BufferWriter write_buffer(*mysql_output_tmp_col);
+ size_t write_buffer_index = 0;
+ if (serde_dialect == TSerdeDialect::DORIS && !is_binary_format) {
+ for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
+ auto& mysql_rows = result->result_batch.rows[row_idx];
+ for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
+ const auto col_index = index_check_const(row_idx,
arguments[col_idx].is_const);
+ const auto* column = arguments[col_idx].column;
+ if
(arguments[col_idx].serde->write_column_to_mysql_text(*column, write_buffer,
+
col_index)) {
+ write_buffer.commit();
+ auto str =
mysql_output_tmp_col->get_data_at(write_buffer_index);
+ direct_write_to_mysql_result_string(mysql_rows,
str.data, str.size);
+ write_buffer_index++;
+ } else {
+ direct_write_to_mysql_result_null(mysql_rows);
+ }
+ }
+ bytes_sent += mysql_rows.size();
}
+ } else {
+ for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
+ for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
+
RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql(
+ *(arguments[col_idx].column), row_buffer, row_idx,
+ arguments[col_idx].is_const, _options));
+ }
- // copy MysqlRowBuffer to Thrift
- result->result_batch.rows[row_idx].append(row_buffer.buf(),
row_buffer.length());
- bytes_sent += row_buffer.length();
- row_buffer.reset();
- if constexpr (is_binary_format) {
- row_buffer.start_binary_row(_output_vexpr_ctxs.size());
+ // copy MysqlRowBuffer to Thrift
+ result->result_batch.rows[row_idx].append(row_buffer.buf(),
row_buffer.length());
+ bytes_sent += row_buffer.length();
+ row_buffer.reset();
+ if constexpr (is_binary_format) {
+ row_buffer.start_binary_row(_output_vexpr_ctxs.size());
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]