This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 30b51a54f9b branch-4.0: [refactor](mysql output)Use to_string when 
outputting plain text to MySQL. #57824 (#58264)
30b51a54f9b is described below

commit 30b51a54f9bf3242ddd3d94a1d4589173743bfc1
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Nov 24 16:13:00 2025 +0800

    branch-4.0: [refactor](mysql output)Use to_string when outputting plain 
text to MySQL. #57824 (#58264)
    
    Cherry-picked from #57824
    
    Co-authored-by: Mryange <[email protected]>
---
 .../data_types/serde/data_type_bitmap_serde.cpp    | 15 ++++
 .../vec/data_types/serde/data_type_bitmap_serde.h  |  3 +
 .../vec/data_types/serde/data_type_hll_serde.cpp   | 15 ++++
 be/src/vec/data_types/serde/data_type_hll_serde.h  |  3 +
 .../vec/data_types/serde/data_type_jsonb_serde.cpp |  6 ++
 .../data_types/serde/data_type_nullable_serde.cpp  | 10 +++
 .../data_types/serde/data_type_nullable_serde.h    |  2 +
 .../data_types/serde/data_type_number_serde.cpp    | 12 +++-
 .../serde/data_type_quantilestate_serde.cpp        | 18 +++++
 .../serde/data_type_quantilestate_serde.h          |  3 +
 be/src/vec/data_types/serde/data_type_serde.cpp    |  6 ++
 be/src/vec/data_types/serde/data_type_serde.h      |  5 ++
 be/src/vec/sink/vmysql_result_writer.cpp           | 84 +++++++++++++++++++---
 13 files changed, 170 insertions(+), 12 deletions(-)

diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp 
b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
index 525fc24451a..93e2f48e59d 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.cpp
@@ -173,6 +173,21 @@ Status DataTypeBitMapSerDe::_write_column_to_mysql(const 
IColumn& column,
     return Status::OK();
 }
 
+bool DataTypeBitMapSerDe::write_column_to_mysql_text(const IColumn& column, 
BufferWritable& bw,
+                                                     int64_t row_idx) const {
+    const auto& data_column = assert_cast<const ColumnBitmap&>(column);
+    if (_return_object_as_string) {
+        BitmapValue bitmap_value = data_column.get_element(row_idx);
+        size_t size = bitmap_value.getSizeInBytes();
+        std::unique_ptr<char[]> buf = 
std::make_unique_for_overwrite<char[]>(size);
+        bitmap_value.write_to(buf.get());
+        bw.write(buf.get(), size);
+        return true;
+    } else {
+        return false;
+    }
+}
+
 Status DataTypeBitMapSerDe::write_column_to_mysql_binary(const IColumn& column,
                                                          MysqlRowBinaryBuffer& 
row_buffer,
                                                          int64_t row_idx, bool 
col_const,
diff --git a/be/src/vec/data_types/serde/data_type_bitmap_serde.h 
b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
index a459a1a93bd..482665aaaa1 100644
--- a/be/src/vec/data_types/serde/data_type_bitmap_serde.h
+++ b/be/src/vec/data_types/serde/data_type_bitmap_serde.h
@@ -78,6 +78,9 @@ public:
                                       int64_t row_idx, bool col_const,
                                       const FormatOptions& options) const 
override;
 
+    bool write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
+                                    int64_t row_idx) const override;
+
     Status write_column_to_orc(const std::string& timezone, const IColumn& 
column,
                                const NullMap* null_map, 
orc::ColumnVectorBatch* orc_col_batch,
                                int64_t start, int64_t end, vectorized::Arena& 
arena) const override;
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.cpp 
b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
index 9c2c0ef430f..0ccfe06257b 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.cpp
@@ -170,6 +170,21 @@ Status DataTypeHLLSerDe::_write_column_to_mysql(const 
IColumn& column,
     return Status::OK();
 }
 
+bool DataTypeHLLSerDe::write_column_to_mysql_text(const IColumn& column, 
BufferWritable& bw,
+                                                  int64_t row_idx) const {
+    const auto& data_column = assert_cast<const ColumnHLL&>(column);
+    if (_return_object_as_string) {
+        const HyperLogLog& hyperLogLog = data_column.get_element(row_idx);
+        size_t size = hyperLogLog.max_serialized_size();
+        std::unique_ptr<char[]> buf = 
std::make_unique_for_overwrite<char[]>(size);
+        hyperLogLog.serialize((uint8_t*)buf.get());
+        bw.write(buf.get(), size);
+        return true;
+    } else {
+        return false;
+    }
+}
+
 Status DataTypeHLLSerDe::write_column_to_mysql_binary(const IColumn& column,
                                                       MysqlRowBinaryBuffer& 
row_buffer,
                                                       int64_t row_idx, bool 
col_const,
diff --git a/be/src/vec/data_types/serde/data_type_hll_serde.h 
b/be/src/vec/data_types/serde/data_type_hll_serde.h
index 25ffbbef3b0..574967b4938 100644
--- a/be/src/vec/data_types/serde/data_type_hll_serde.h
+++ b/be/src/vec/data_types/serde/data_type_hll_serde.h
@@ -72,6 +72,9 @@ public:
                                       int64_t row_idx, bool col_const,
                                       const FormatOptions& options) const 
override;
 
+    bool write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
+                                    int64_t row_idx) const override;
+
     Status write_column_to_orc(const std::string& timezone, const IColumn& 
column,
                                const NullMap* null_map, 
orc::ColumnVectorBatch* orc_col_batch,
                                int64_t start, int64_t end, vectorized::Arena& 
arena) const override;
diff --git a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp 
b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
index e26747fdfdd..609bab36e03 100644
--- a/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_jsonb_serde.cpp
@@ -355,8 +355,14 @@ void DataTypeJsonbSerDe::to_string(const IColumn& column, 
size_t row_num,
     const auto& col = assert_cast<const ColumnString&, 
TypeCheckOnRelease::DISABLE>(column);
     const auto& data_ref = col.get_data_at(row_num);
     if (data_ref.size > 0) {
+        if (_nesting_level > 1) {
+            bw.write('"');
+        }
         std::string str = JsonbToJson::jsonb_to_json_string(data_ref.data, 
data_ref.size);
         bw.write(str.c_str(), str.size());
+        if (_nesting_level > 1) {
+            bw.write('"');
+        }
     } else {
         bw.write("NULL", 4);
     }
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp 
b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
index 985db532a53..6e9b29c918b 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.cpp
@@ -353,6 +353,16 @@ Status 
DataTypeNullableSerDe::read_column_from_arrow(IColumn& column,
                                                 ctz);
 }
 
+bool DataTypeNullableSerDe::write_column_to_mysql_text(const IColumn& column, 
BufferWritable& bw,
+                                                       int64_t row_idx) const {
+    if (column.is_null_at(row_idx)) {
+        return false;
+    } else {
+        const auto& col = assert_cast<const ColumnNullable&>(column);
+        return 
nested_serde->write_column_to_mysql_text(col.get_nested_column(), bw, row_idx);
+    }
+}
+
 template <bool is_binary_format>
 Status DataTypeNullableSerDe::_write_column_to_mysql(const IColumn& column,
                                                      
MysqlRowBuffer<is_binary_format>& result,
diff --git a/be/src/vec/data_types/serde/data_type_nullable_serde.h 
b/be/src/vec/data_types/serde/data_type_nullable_serde.h
index 16bf509e649..e85bea68a25 100644
--- a/be/src/vec/data_types/serde/data_type_nullable_serde.h
+++ b/be/src/vec/data_types/serde/data_type_nullable_serde.h
@@ -91,6 +91,8 @@ public:
     Status write_column_to_mysql_text(const IColumn& column, 
MysqlRowTextBuffer& row_buffer,
                                       int64_t row_idx, bool col_const,
                                       const FormatOptions& options) const 
override;
+    bool write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
+                                    int64_t row_idx) const override;
 
     Status write_column_to_orc(const std::string& timezone, const IColumn& 
column,
                                const NullMap* null_map, 
orc::ColumnVectorBatch* orc_col_batch,
diff --git a/be/src/vec/data_types/serde/data_type_number_serde.cpp 
b/be/src/vec/data_types/serde/data_type_number_serde.cpp
index d35f2063512..753c863e8d4 100644
--- a/be/src/vec/data_types/serde/data_type_number_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_number_serde.cpp
@@ -918,7 +918,17 @@ template <PrimitiveType T>
 void DataTypeNumberSerDe<T>::to_string(const IColumn& column, size_t row_num,
                                        BufferWritable& bw) const {
     auto& data = assert_cast<const ColumnType&, 
TypeCheckOnRelease::DISABLE>(column).get_data();
-    value_to_string<T>(data[row_num], bw, get_scale());
+    if constexpr (is_date_type(T) || is_time_type(T) || is_ip(T)) {
+        if (_nesting_level > 1) {
+            bw.write('"');
+        }
+        value_to_string<T>(data[row_num], bw, get_scale());
+        if (_nesting_level > 1) {
+            bw.write('"');
+        }
+    } else {
+        value_to_string<T>(data[row_num], bw, get_scale());
+    }
 }
 
 template <PrimitiveType T>
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp 
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp
index 08d136aac9d..b6a1738fca0 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.cpp
@@ -43,4 +43,22 @@ void 
DataTypeQuantileStateSerDe::read_one_cell_from_jsonb(IColumn& column,
     val.deserialize(Slice(blob->getBlob(), blob->getBlobLen()));
     col.insert_value(val);
 }
+
+bool DataTypeQuantileStateSerDe::write_column_to_mysql_text(const IColumn& 
column,
+                                                            BufferWritable& bw,
+                                                            int64_t row_idx) 
const {
+    const auto& data_column = reinterpret_cast<const 
ColumnQuantileState&>(column);
+
+    if (_return_object_as_string) {
+        const auto& quantile_value = data_column.get_element(row_idx);
+        size_t size = quantile_value.get_serialized_size();
+        std::unique_ptr<char[]> buf = 
std::make_unique_for_overwrite<char[]>(size);
+        quantile_value.serialize((uint8_t*)buf.get());
+        bw.write(buf.get(), size);
+        return true;
+    } else {
+        return false;
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h 
b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
index 9e4492035d4..1b7c8d455eb 100644
--- a/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
+++ b/be/src/vec/data_types/serde/data_type_quantilestate_serde.h
@@ -136,6 +136,9 @@ public:
         return _write_column_to_mysql(column, row_buffer, row_idx, col_const, 
options);
     }
 
+    bool write_column_to_mysql_text(const IColumn& column, BufferWritable& bw,
+                                    int64_t row_idx) const override;
+
     Status write_column_to_orc(const std::string& timezone, const IColumn& 
column,
                                const NullMap* null_map, 
orc::ColumnVectorBatch* orc_col_batch,
                                int64_t start, int64_t end,
diff --git a/be/src/vec/data_types/serde/data_type_serde.cpp 
b/be/src/vec/data_types/serde/data_type_serde.cpp
index 0b1288895fa..1eb86a78ea2 100644
--- a/be/src/vec/data_types/serde/data_type_serde.cpp
+++ b/be/src/vec/data_types/serde/data_type_serde.cpp
@@ -135,6 +135,12 @@ void DataTypeSerDe::to_string(const IColumn& column, 
size_t row_num, BufferWrita
                            "Data type {} to_string_batch not implement.", 
get_name());
 }
 
+bool DataTypeSerDe::write_column_to_mysql_text(const IColumn& column, 
BufferWritable& bw,
+                                               int64_t row_idx) const {
+    to_string(column, row_idx, bw);
+    return true;
+}
+
 const std::string DataTypeSerDe::NULL_IN_COMPLEX_TYPE = "null";
 const std::string DataTypeSerDe::NULL_IN_CSV_FOR_ORDINARY_TYPE = "\\N";
 
diff --git a/be/src/vec/data_types/serde/data_type_serde.h 
b/be/src/vec/data_types/serde/data_type_serde.h
index e4cd4eaa041..03a71368cb4 100644
--- a/be/src/vec/data_types/serde/data_type_serde.h
+++ b/be/src/vec/data_types/serde/data_type_serde.h
@@ -431,6 +431,11 @@ public:
                                               int64_t row_idx, bool col_const,
                                               const FormatOptions& options) 
const = 0;
 
+    // return true if output as string
+    // return false if output null
+    virtual bool write_column_to_mysql_text(const IColumn& column, 
BufferWritable& bw,
+                                            int64_t row_idx) const;
+
     virtual Status write_column_to_mysql_binary(const IColumn& column,
                                                 MysqlRowBinaryBuffer& 
row_buffer, int64_t row_idx,
                                                 bool col_const,
diff --git a/be/src/vec/sink/vmysql_result_writer.cpp 
b/be/src/vec/sink/vmysql_result_writer.cpp
index b20d260d400..fad638b0374 100644
--- a/be/src/vec/sink/vmysql_result_writer.cpp
+++ b/be/src/vec/sink/vmysql_result_writer.cpp
@@ -36,6 +36,7 @@
 #include "runtime/result_block_buffer.h"
 #include "runtime/runtime_state.h"
 #include "runtime/types.h"
+#include "util/mysql_global.h"
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_const.h"
@@ -174,6 +175,42 @@ Status VMysqlResultWriter<is_binary_format>::_set_options(
     return Status::OK();
 }
 
+void direct_write_to_mysql_result_string(std::string& mysql_rows, const char* 
str, size_t size) {
+    // MySQL protocol length encoding:
+    // <= 250: 1 byte for length
+    // < 65536: 1 byte (252) + 2 bytes for length
+    // < 16777216: 1 byte (253) + 3 bytes for length
+    // >= 16777216: 1 byte (254) + 8 bytes for length
+
+    char buf[16];
+    if (size < 251ULL) {
+        int1store(buf, size);
+        mysql_rows.append(buf, 1);
+    } else if (size < 65536ULL) {
+        buf[0] = static_cast<char>(252);
+        uint16_t temp16 = static_cast<uint16_t>(size);
+        memcpy(buf + 1, &temp16, sizeof(temp16));
+        mysql_rows.append(buf, 3);
+    } else if (size < 16777216ULL) {
+        buf[0] = static_cast<char>(253);
+        int3store(buf + 1, size);
+        mysql_rows.append(buf, 4);
+    } else {
+        buf[0] = static_cast<char>(254);
+        uint64_t temp64 = static_cast<uint64_t>(size);
+        memcpy(buf + 1, &temp64, sizeof(temp64));
+        mysql_rows.append(buf, 9);
+    }
+
+    // Append string content
+    mysql_rows.append(str, size);
+}
+
+void direct_write_to_mysql_result_null(std::string& mysql_rows) {
+    // MySQL protocol for NULL value is a single byte with value 251
+    mysql_rows.push_back(static_cast<char>(251));
+}
+
 template <bool is_binary_format>
 Status VMysqlResultWriter<is_binary_format>::_write_one_block(RuntimeState* 
state, Block& block) {
     Status status = Status::OK();
@@ -233,19 +270,44 @@ Status 
VMysqlResultWriter<is_binary_format>::_write_one_block(RuntimeState* stat
             }
         }
 
-        for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
-            for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
-                
RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql(
-                        *(arguments[col_idx].column), row_buffer, row_idx,
-                        arguments[col_idx].is_const, _options));
+        const auto& serde_dialect = state->query_options().serde_dialect;
+
+        auto mysql_output_tmp_col = ColumnString::create();
+        BufferWriter write_buffer(*mysql_output_tmp_col);
+        size_t write_buffer_index = 0;
+        if (serde_dialect == TSerdeDialect::DORIS && !is_binary_format) {
+            for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
+                auto& mysql_rows = result->result_batch.rows[row_idx];
+                for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
+                    const auto col_index = index_check_const(row_idx, 
arguments[col_idx].is_const);
+                    const auto* column = arguments[col_idx].column;
+                    if 
(arguments[col_idx].serde->write_column_to_mysql_text(*column, write_buffer,
+                                                                             
col_index)) {
+                        write_buffer.commit();
+                        auto str = 
mysql_output_tmp_col->get_data_at(write_buffer_index);
+                        direct_write_to_mysql_result_string(mysql_rows, 
str.data, str.size);
+                        write_buffer_index++;
+                    } else {
+                        direct_write_to_mysql_result_null(mysql_rows);
+                    }
+                }
+                bytes_sent += mysql_rows.size();
             }
+        } else {
+            for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
+                for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
+                    
RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql(
+                            *(arguments[col_idx].column), row_buffer, row_idx,
+                            arguments[col_idx].is_const, _options));
+                }
 
-            // copy MysqlRowBuffer to Thrift
-            result->result_batch.rows[row_idx].append(row_buffer.buf(), 
row_buffer.length());
-            bytes_sent += row_buffer.length();
-            row_buffer.reset();
-            if constexpr (is_binary_format) {
-                row_buffer.start_binary_row(_output_vexpr_ctxs.size());
+                // copy MysqlRowBuffer to Thrift
+                result->result_batch.rows[row_idx].append(row_buffer.buf(), 
row_buffer.length());
+                bytes_sent += row_buffer.length();
+                row_buffer.reset();
+                if constexpr (is_binary_format) {
+                    row_buffer.start_binary_row(_output_vexpr_ctxs.size());
+                }
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to