This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b49dc8042d [feature](load) refactor CSV reading process during 
scanning, and support enclose and escape for stream load (#22539)
b49dc8042d is described below

commit b49dc8042d1b79c4225c81842ec9cf25ec979dda
Author: Siyang Tang <[email protected]>
AuthorDate: Tue Aug 15 09:23:53 2023 +0800

    [feature](load) refactor CSV reading process during scanning, and support 
enclose and escape for stream load (#22539)
    
     ## Proposed changes
    
    Refactor thoughts: close #22383
    Descriptions about `enclose` and `escape`: #22385
    
    ## Further comments
    
    2023-08-09:
    It's a pity that experiment shows that the original way for parsing plain 
CSV is faster. Therefor, the refactor is only applied on enclose related code. 
The plain CSV parser use the original logic.
    
    Fallback of performance is unavoidable anyway. From the `CSV reader`'s 
perspective, the real weak point may be the write column behavior, proved by 
the flame graph.
    
    Trimming escape will be enable after fix: #22411 is merged
    
    Cases should be discussed:
    
    1. When an incomplete enclose appears in the beginning of a large scale 
data, the line delimiter will be unreachable till the EOF, will the buffer 
become extremely large?
    2. What if an infinite line occurs in the case? Essentially,  `1.` is 
equivalent to this.
    
    Only support stream load as trial in this PR, avoid too many unrelated 
changes. Docs will be added when `enclose` and `escape` is available for all 
kinds of load.
---
 be/src/exec/line_reader.h                          |   2 +-
 be/src/exec/text_converter.cpp                     |   5 +-
 be/src/exec/text_converter.h                       |  14 +-
 be/src/http/action/stream_load.cpp                 |   6 +
 be/src/http/http_common.h                          |   2 +
 be/src/vec/exec/format/csv/csv_reader.cpp          | 268 +++++++++++----------
 be/src/vec/exec/format/csv/csv_reader.h            | 128 +++++++++-
 .../file_reader/new_plain_text_line_reader.cpp     | 161 +++++++++++--
 .../file_reader/new_plain_text_line_reader.h       | 172 +++++++++++--
 be/src/vec/exec/format/json/new_json_reader.cpp    |   8 +-
 .../org/apache/doris/analysis/DataDescription.java |  14 ++
 .../org/apache/doris/load/BrokerFileGroup.java     |  14 ++
 .../doris/load/routineload/RoutineLoadJob.java     |  12 +
 .../doris/planner/external/LoadScanProvider.java   |   2 +
 .../java/org/apache/doris/task/LoadTaskInfo.java   |  10 +
 .../java/org/apache/doris/task/StreamLoadTask.java |  28 +++
 gensrc/thrift/FrontendService.thrift               |   4 +
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 .../load_p0/stream_load/enclose_empty_values.csv   |   1 +
 .../load_p0/stream_load/enclose_incomplete.csv     |   2 +
 .../stream_load/enclose_multi_char_delimiter.csv   |   1 +
 .../data/load_p0/stream_load/enclose_normal.csv    |   5 +
 .../stream_load/enclose_not_trim_quotes.csv        |   3 +
 .../load_p0/stream_load/enclose_with_escape.csv    |   3 +
 .../load_p0/stream_load/enclose_without_escape.csv |   2 +
 .../load_p0/stream_load/enclose_wrong_position.csv |   1 +
 .../test_csv_with_enclose_and_escape.out           |  12 +
 .../stream_load/test_txt_special_delimiter.csv     |   2 +-
 .../test_csv_with_enclose_and_escape.groovy        | 116 +++++++++
 .../stream_load/test_txt_special_delimiter.groovy  |   3 +-
 30 files changed, 832 insertions(+), 171 deletions(-)

diff --git a/be/src/exec/line_reader.h b/be/src/exec/line_reader.h
index 7a8de582e4..15e92b5f22 100644
--- a/be/src/exec/line_reader.h
+++ b/be/src/exec/line_reader.h
@@ -24,7 +24,7 @@ namespace doris {
 namespace io {
 class IOContext;
 }
-// This class is used for CSV scanner, to read content line by line
+// This class is used to read content line by line
 class LineReader {
 public:
     virtual ~LineReader() = default;
diff --git a/be/src/exec/text_converter.cpp b/be/src/exec/text_converter.cpp
index 1346b14a7b..f278e273ed 100644
--- a/be/src/exec/text_converter.cpp
+++ b/be/src/exec/text_converter.cpp
@@ -52,9 +52,12 @@ TextConverter::TextConverter(char escape_char, char 
collection_delimiter, char m
 
 void TextConverter::write_string_column(const SlotDescriptor* slot_desc,
                                         vectorized::MutableColumnPtr* 
column_ptr, const char* data,
-                                        size_t len) {
+                                        size_t len, bool need_escape) {
     DCHECK(column_ptr->get()->is_nullable());
     auto* nullable_column = 
reinterpret_cast<vectorized::ColumnNullable*>(column_ptr->get());
+    if (need_escape) {
+        unescape_string_on_spot(data, &len);
+    }
     if ((len == 2 && data[0] == '\\' && data[1] == 'N') || len == 
SQL_NULL_DATA) {
         nullable_column->get_null_map_data().push_back(1);
         
reinterpret_cast<vectorized::ColumnString&>(nullable_column->get_nested_column())
diff --git a/be/src/exec/text_converter.h b/be/src/exec/text_converter.h
index ef4e87f5a5..8a60c74523 100644
--- a/be/src/exec/text_converter.h
+++ b/be/src/exec/text_converter.h
@@ -17,7 +17,7 @@
 
 #pragma once
 
-#include <stddef.h>
+#include <cstddef>
 
 #include "vec/columns/column.h"
 
@@ -33,9 +33,15 @@ public:
 
     TextConverter(char escape_char, char collection_delimiter = '\2', char 
map_kv_delimiter = '\3');
 
+    inline void write_string_column(const SlotDescriptor* slot_desc,
+                                    vectorized::MutableColumnPtr* column_ptr, 
const char* data,
+                                    size_t len) {
+        return write_string_column(slot_desc, column_ptr, data, len, false);
+    }
+
     void write_string_column(const SlotDescriptor* slot_desc,
-                             vectorized::MutableColumnPtr* column_ptr, const 
char* data,
-                             size_t len);
+                             vectorized::MutableColumnPtr* column_ptr, const 
char* data, size_t len,
+                             bool need_escape);
 
     inline bool write_column(const SlotDescriptor* slot_desc,
                              vectorized::MutableColumnPtr* column_ptr, const 
char* data, size_t len,
@@ -62,6 +68,8 @@ public:
     }
     void set_map_kv_delimiter(char mapkv_delimiter) { _map_kv_delimiter = 
mapkv_delimiter; }
 
+    inline void set_escape_char(const char escape) { this->_escape_char = 
escape; }
+
 private:
     bool _write_data(const TypeDescriptor& type_desc, vectorized::IColumn* 
nullable_col_ptr,
                      const char* data, size_t len, bool copy_string, bool 
need_escape, size_t rows,
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index 9f715ce7cd..c57683bd39 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -375,6 +375,12 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
     if (!http_req->header(HTTP_LINE_DELIMITER).empty()) {
         request.__set_line_delimiter(http_req->header(HTTP_LINE_DELIMITER));
     }
+    if (!http_req->header(HTTP_ENCLOSE).empty() && 
http_req->header(HTTP_ENCLOSE).size() > 0) {
+        request.__set_enclose(http_req->header(HTTP_ENCLOSE)[0]);
+    }
+    if (!http_req->header(HTTP_ESCAPE).empty() && 
http_req->header(HTTP_ESCAPE).size() > 0) {
+        request.__set_escape(http_req->header(HTTP_ESCAPE)[0]);
+    }
     if (!http_req->header(HTTP_PARTITIONS).empty()) {
         request.__set_partitions(http_req->header(HTTP_PARTITIONS));
         request.__set_isTempPartition(false);
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index 0af6e13f59..27cd7c63fc 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -29,6 +29,8 @@ static const std::string HTTP_COLUMNS = "columns";
 static const std::string HTTP_WHERE = "where";
 static const std::string HTTP_COLUMN_SEPARATOR = "column_separator";
 static const std::string HTTP_LINE_DELIMITER = "line_delimiter";
+static const std::string HTTP_ENCLOSE = "enclose";
+static const std::string HTTP_ESCAPE = "escape";
 static const std::string HTTP_MAX_FILTER_RATIO = "max_filter_ratio";
 static const std::string HTTP_TIMEOUT = "timeout";
 static const std::string HTTP_PARTITIONS = "partitions";
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 93cc148248..aed7864dd8 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -20,11 +20,12 @@
 #include <fmt/format.h>
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Types_types.h>
-#include <gen_cpp/internal_service.pb.h>
 #include <glog/logging.h>
 
 #include <algorithm>
+#include <cstddef>
 #include <map>
+#include <memory>
 #include <new>
 #include <ostream>
 #include <utility>
@@ -69,6 +70,78 @@ namespace doris::vectorized {
 
 const static Slice _s_null_slice = Slice("\\N");
 
+void EncloseCsvTextFieldSplitter::do_split(const Slice& line, 
std::vector<Slice>* splitted_values) {
+    const char* data = line.data;
+    const auto& column_sep_positions = 
_text_line_reader_ctx->column_sep_positions();
+    size_t value_start_offset = 0;
+    for (auto idx : column_sep_positions) {
+        process_value_func(data, value_start_offset, idx - value_start_offset, 
trimming_char,
+                           splitted_values);
+        value_start_offset = idx + value_sep_len;
+    }
+    // process the last column
+    process_value_func(data, value_start_offset, line.size - 
value_start_offset, trimming_char,
+                       splitted_values);
+}
+
+void PlainCsvTextFieldSplitter::_split_field_single_char(const Slice& line,
+                                                         std::vector<Slice>* 
splitted_values) {
+    const char* data = line.data;
+    const size_t size = line.size;
+    size_t value_start = 0;
+    for (size_t i = 0; i < size; ++i) {
+        if (data[i] == _value_sep[0]) {
+            process_value_func(data, value_start, i - value_start, 
trimming_char, splitted_values);
+            value_start = i + value_sep_len;
+        }
+    }
+    process_value_func(data, value_start, size - value_start, trimming_char, 
splitted_values);
+}
+
+void PlainCsvTextFieldSplitter::_split_field_multi_char(const Slice& line,
+                                                        std::vector<Slice>* 
splitted_values) {
+    const char* data = line.data;
+    size_t start = 0;  // point to the start pos of next col value.
+    size_t curpos = 0; // point to the start pos of separator matching 
sequence.
+    size_t p1 = 0;     // point to the current pos of separator matching 
sequence.
+
+    // Separator: AAAA
+    //
+    //    p1
+    //     ▼
+    //     AAAA
+    //   1000AAAA2000AAAA
+    //   ▲   ▲
+    // Start │
+    //     curpos
+    while (curpos < line.size) {
+        if (curpos + p1 == line.size || *(data + curpos + p1) != 
_value_sep[p1]) {
+            // Not match, move forward:
+            curpos += (p1 == 0 ? 1 : p1);
+            p1 = 0;
+        } else {
+            p1++;
+            if (p1 == value_sep_len) {
+                // Match a separator
+                process_value_func(data, start, curpos - start, trimming_char, 
splitted_values);
+                start = curpos + value_sep_len;
+                curpos = start;
+                p1 = 0;
+            }
+        }
+    }
+    CHECK(curpos == line.size) << curpos << " vs " << line.size;
+    process_value_func(data, start, curpos - start, trimming_char, 
splitted_values);
+}
+
+void PlainCsvTextFieldSplitter::do_split(const Slice& line, 
std::vector<Slice>* splitted_values) {
+    if (is_single_char_delim) {
+        _split_field_single_char(line, splitted_values);
+    } else {
+        _split_field_multi_char(line, splitted_values);
+    }
+}
+
 CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, 
ScannerCounter* counter,
                      const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
                      const std::vector<SlotDescriptor*>& file_slot_descs, 
io::IOContext* io_ctx)
@@ -96,7 +169,7 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* 
profile, ScannerCounte
     }
     _size = _range.size;
 
-    _split_values.reserve(sizeof(Slice) * _file_slot_descs.size());
+    _split_values.reserve(_file_slot_descs.size());
     _init_system_properties();
     _init_file_description();
 }
@@ -199,6 +272,16 @@ Status CsvReader::init_reader(bool is_load) {
     _value_separator_length = _value_separator.size();
     _line_delimiter = _params.file_attributes.text_params.line_delimiter;
     _line_delimiter_length = _line_delimiter.size();
+    if (_params.file_attributes.text_params.__isset.enclose) {
+        _enclose = _params.file_attributes.text_params.enclose;
+    }
+    if (_params.file_attributes.text_params.__isset.escape) {
+        _escape = _params.file_attributes.text_params.escape;
+    }
+    _text_converter->set_escape_char(_escape);
+
+    _trim_tailing_spaces =
+            (_state != nullptr && 
_state->trim_tailing_spaces_for_external_table_query());
 
     _collection_delimiter = 
_params.file_attributes.text_params.collection_delimiter;
     _text_converter->set_collection_delimiter(_collection_delimiter[0]);
@@ -210,6 +293,27 @@ Status CsvReader::init_reader(bool is_load) {
         _trim_double_quotes = _params.file_attributes.trim_double_quotes;
     }
 
+    _not_trim_enclose = (!_trim_double_quotes && _enclose == '\"');
+
+    std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
+    if (_enclose == 0) {
+        text_line_reader_ctx =
+                std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, 
_line_delimiter_length);
+
+        _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
+                _trim_tailing_spaces, _trim_double_quotes, _value_separator,
+                _value_separator_length, '\"');
+    } else {
+        text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
+                _line_delimiter, _line_delimiter_length, _value_separator, 
_value_separator_length,
+                _file_slot_descs.size() - 1, _enclose, _escape);
+
+        _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
+                _trim_tailing_spaces, !_not_trim_enclose,
+                
std::static_pointer_cast<EncloseCsvLineReaderContext>(text_line_reader_ctx),
+                _value_separator_length, _enclose);
+    }
+
     // create decompressor.
     // _decompressor may be nullptr if this is not a compressed file
     RETURN_IF_ERROR(_create_decompressor());
@@ -226,12 +330,13 @@ Status CsvReader::init_reader(bool is_load) {
     case TFileFormatType::FORMAT_CSV_LZOP:
         [[fallthrough]];
     case TFileFormatType::FORMAT_CSV_DEFLATE:
-        _line_reader = NewPlainTextLineReader::create_unique(
-                _profile, _file_reader, _decompressor.get(), _size, 
_line_delimiter,
-                _line_delimiter_length, start_offset);
+        _line_reader =
+                NewPlainTextLineReader::create_unique(_profile, _file_reader, 
_decompressor.get(),
+                                                      text_line_reader_ctx, 
_size, start_offset);
 
         break;
     case TFileFormatType::FORMAT_PROTO:
+        _fields_splitter = std::make_unique<CsvProtoFieldSplitter>();
         _line_reader = NewPlainBinaryLineReader::create_unique(_file_reader);
         break;
     default:
@@ -412,8 +517,8 @@ Status CsvReader::_fill_dest_columns(const Slice& line, 
Block* block,
             const Slice& value =
                     col_idx < _split_values.size() ? _split_values[col_idx] : 
_s_null_slice;
             // For load task, we always read "string" from file, so use 
"write_string_column"
-            _text_converter->write_string_column(src_slot_desc, &columns[i], 
value.data,
-                                                 value.size);
+            _text_converter->write_string_column(src_slot_desc, &columns[i], 
value.data, value.size,
+                                                 _escape != 0);
         }
     } else {
         // if _split_values.size > _file_slot_descs.size()
@@ -455,11 +560,7 @@ Status CsvReader::_line_split_to_values(const Slice& line, 
bool* success) {
         }
     }
 
-    if (_value_separator_length == 1) {
-        _split_line_for_single_char_delimiter(line);
-    } else {
-        _split_line(line);
-    }
+    _split_line(line);
 
     if (_is_load) {
         // Only check for load task. For query task, the non exist column will 
be filled "null".
@@ -493,123 +594,9 @@ Status CsvReader::_line_split_to_values(const Slice& 
line, bool* success) {
     return Status::OK();
 }
 
-void CsvReader::_split_line_for_proto_format(const Slice& line) {
-    PDataRow** row_ptr = reinterpret_cast<PDataRow**>(line.data);
-    PDataRow* row = *row_ptr;
-    for (const PDataColumn& col : row->col()) {
-        _split_values.emplace_back(col.value());
-    }
-}
-
-void CsvReader::_split_line_for_single_char_delimiter(const Slice& line) {
-    _split_values.clear();
-    if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
-        _split_line_for_proto_format(line);
-    } else {
-        const char* value = line.data;
-        size_t cur_pos = 0;
-        size_t start_field = 0;
-        const size_t size = line.size;
-        for (; cur_pos < size; ++cur_pos) {
-            if (*(value + cur_pos) == _value_separator[0]) {
-                size_t non_space = cur_pos;
-                if (_state != nullptr && 
_state->trim_tailing_spaces_for_external_table_query()) {
-                    while (non_space > start_field && *(value + non_space - 1) 
== ' ') {
-                        non_space--;
-                    }
-                }
-                if (_trim_double_quotes && non_space > (start_field + 1) &&
-                    *(value + start_field) == '\"' && *(value + non_space - 1) 
== '\"') {
-                    start_field++;
-                    non_space--;
-                }
-                _split_values.emplace_back(value + start_field, non_space - 
start_field);
-                start_field = cur_pos + 1;
-            }
-        }
-
-        CHECK(cur_pos == line.size) << cur_pos << " vs " << line.size;
-        size_t non_space = cur_pos;
-        if (_state != nullptr && 
_state->trim_tailing_spaces_for_external_table_query()) {
-            while (non_space > start_field && *(value + non_space - 1) == ' ') 
{
-                non_space--;
-            }
-        }
-        if (_trim_double_quotes && non_space > (start_field + 1) &&
-            *(value + start_field) == '\"' && *(value + non_space - 1) == 
'\"') {
-            start_field++;
-            non_space--;
-        }
-        _split_values.emplace_back(value + start_field, non_space - 
start_field);
-    }
-}
-
 void CsvReader::_split_line(const Slice& line) {
     _split_values.clear();
-    if (_file_format_type == TFileFormatType::FORMAT_PROTO) {
-        _split_line_for_proto_format(line);
-    } else {
-        const char* value = line.data;
-        size_t start = 0;     // point to the start pos of next col value.
-        size_t curpos = 0;    // point to the start pos of separator matching 
sequence.
-        size_t p1 = 0;        // point to the current pos of separator 
matching sequence.
-        size_t non_space = 0; // point to the last pos of non_space charactor.
-
-        // Separator: AAAA
-        //
-        //    p1
-        //     ▼
-        //     AAAA
-        //   1000AAAA2000AAAA
-        //   ▲   ▲
-        // Start │
-        //     curpos
-
-        while (curpos < line.size) {
-            if (curpos + p1 == line.size || *(value + curpos + p1) != 
_value_separator[p1]) {
-                // Not match, move forward:
-                curpos += (p1 == 0 ? 1 : p1);
-                p1 = 0;
-            } else {
-                p1++;
-                if (p1 == _value_separator_length) {
-                    // Match a separator
-                    non_space = curpos;
-                    // Trim tailing spaces. Be consistent with hive and 
trino's behavior.
-                    if (_state != nullptr &&
-                        
_state->trim_tailing_spaces_for_external_table_query()) {
-                        while (non_space > start && *(value + non_space - 1) 
== ' ') {
-                            non_space--;
-                        }
-                    }
-                    if (_trim_double_quotes && (non_space - 1) > start &&
-                        *(value + start) == '\"' && *(value + non_space - 1) 
== '\"') {
-                        start++;
-                        non_space--;
-                    }
-                    _split_values.emplace_back(value + start, non_space - 
start);
-                    start = curpos + _value_separator_length;
-                    curpos = start;
-                    p1 = 0;
-                    non_space = 0;
-                }
-            }
-        }
-
-        CHECK(curpos == line.size) << curpos << " vs " << line.size;
-        non_space = curpos;
-        if (_state != nullptr && 
_state->trim_tailing_spaces_for_external_table_query()) {
-            while (non_space > start && *(value + non_space - 1) == ' ') {
-                non_space--;
-            }
-        }
-        if (_trim_double_quotes && (non_space - 1) > start && *(value + start) 
== '\"' &&
-            *(value + non_space - 1) == '\"') {
-            start++;
-            non_space--;
-        }
-        _split_values.emplace_back(value + start, non_space - start);
-    }
+    _fields_splitter->split_line(line, &_split_values);
 }
 
 Status CsvReader::_check_array_format(std::vector<Slice>& split_values, bool* 
is_success) {
@@ -694,6 +681,14 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
     _value_separator_length = _value_separator.size();
     _line_delimiter = _params.file_attributes.text_params.line_delimiter;
     _line_delimiter_length = _line_delimiter.size();
+    if (_params.file_attributes.text_params.__isset.enclose) {
+        _enclose = _params.file_attributes.text_params.enclose;
+    }
+    if (_params.file_attributes.text_params.__isset.escape) {
+        _escape = _params.file_attributes.text_params.escape;
+    }
+    _not_trim_enclose = (!_trim_double_quotes && _enclose == '\"');
+    _text_converter->set_escape_char(_escape);
 
     _collection_delimiter = 
_params.file_attributes.text_params.collection_delimiter;
     _text_converter->set_collection_delimiter(_collection_delimiter[0]);
@@ -704,10 +699,25 @@ Status CsvReader::_prepare_parse(size_t* read_line, bool* 
is_parse_name) {
     // create decompressor.
     // _decompressor may be nullptr if this is not a compressed file
     RETURN_IF_ERROR(_create_decompressor());
+    std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
+    if (_enclose == 0) {
+        text_line_reader_ctx =
+                std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, 
_line_delimiter_length);
+        _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
+                _trim_tailing_spaces, _trim_double_quotes, _value_separator,
+                _value_separator_length);
+    } else {
+        text_line_reader_ctx = std::make_shared<EncloseCsvLineReaderContext>(
+                _line_delimiter, _line_delimiter_length, _value_separator, 
_value_separator_length,
+                _file_slot_descs.size() - 1, _enclose, _escape);
+        _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
+                _trim_tailing_spaces, false,
+                
std::static_pointer_cast<EncloseCsvLineReaderContext>(text_line_reader_ctx),
+                _value_separator_length);
+    }
 
     _line_reader = NewPlainTextLineReader::create_unique(
-            _profile, _file_reader, _decompressor.get(), _size, 
_line_delimiter,
-            _line_delimiter_length, start_offset);
+            _profile, _file_reader, _decompressor.get(), text_line_reader_ctx, 
_size, start_offset);
 
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/csv/csv_reader.h 
b/be/src/vec/exec/format/csv/csv_reader.h
index a1577a638e..5721bbd929 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -18,9 +18,11 @@
 #pragma once
 
 #include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/internal_service.pb.h>
 #include <stddef.h>
 #include <stdint.h>
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -32,6 +34,7 @@
 #include "io/fs/file_reader_writer_fwd.h"
 #include "util/slice.h"
 #include "vec/data_types/data_type.h"
+#include "vec/exec/format/file_reader/new_plain_text_line_reader.h"
 #include "vec/exec/format/generic_reader.h"
 
 namespace doris {
@@ -54,6 +57,121 @@ namespace vectorized {
 struct ScannerCounter;
 class Block;
 
+class LineFieldSplitterIf {
+public:
+    virtual ~LineFieldSplitterIf() = default;
+
+    virtual void split_line(const Slice& line, std::vector<Slice>* 
splitted_values) = 0;
+};
+
+template <typename Splitter>
+class BaseLineFieldSplitter : public LineFieldSplitterIf {
+public:
+    inline void split_line(const Slice& line, std::vector<Slice>* 
splitted_values) final {
+        static_cast<Splitter*>(this)->split_line_impl(line, splitted_values);
+    }
+};
+
+class CsvProtoFieldSplitter final : public 
BaseLineFieldSplitter<CsvProtoFieldSplitter> {
+public:
+    inline void split_line_impl(const Slice& line, std::vector<Slice>* 
splitted_values) {
+        PDataRow** row_ptr = reinterpret_cast<PDataRow**>(line.data);
+        PDataRow* row = *row_ptr;
+        for (const PDataColumn& col : row->col()) {
+            splitted_values->emplace_back(col.value());
+        }
+    }
+};
+
+template <typename Splitter>
+class BaseCsvTextFieldSplitter : public 
BaseLineFieldSplitter<BaseCsvTextFieldSplitter<Splitter>> {
+    // using a function ptr to decrease the overhead (found very effective 
during test).
+    using ProcessValueFunc = void (*)(const char*, size_t, size_t, char, 
std::vector<Slice>*);
+
+public:
+    explicit BaseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
+                                      size_t value_sep_len = 1, char 
trimming_char = 0)
+            : trimming_char(trimming_char), value_sep_len(value_sep_len) {
+        if (trim_tailing_space) {
+            if (trim_ends) {
+                process_value_func = 
&BaseCsvTextFieldSplitter::_process_value<true, true>;
+            } else {
+                process_value_func = 
&BaseCsvTextFieldSplitter::_process_value<true, false>;
+            }
+        } else {
+            if (trim_ends) {
+                process_value_func = 
&BaseCsvTextFieldSplitter::_process_value<false, true>;
+            } else {
+                process_value_func = 
&BaseCsvTextFieldSplitter::_process_value<false, false>;
+            }
+        }
+    }
+
+    inline void split_line_impl(const Slice& line, std::vector<Slice>* 
splitted_values) {
+        static_cast<Splitter*>(this)->do_split(line, splitted_values);
+    }
+
+protected:
+    const char trimming_char;
+    const size_t value_sep_len;
+    ProcessValueFunc process_value_func;
+
+private:
+    template <bool TrimTailingSpace, bool TrimEnds>
+    inline static void _process_value(const char* data, size_t start_offset, 
size_t value_len,
+                                      char trimming_char, std::vector<Slice>* 
splitted_values) {
+        if constexpr (TrimTailingSpace) {
+            while (value_len > 0 && *(data + start_offset + value_len - 1) == 
' ') {
+                --value_len;
+            }
+        }
+        if constexpr (TrimEnds) {
+            const bool trim_cond = value_len > 1 && *(data + start_offset) == 
trimming_char &&
+                                   *(data + start_offset + value_len - 1) == 
trimming_char;
+            if (trim_cond) {
+                ++(start_offset);
+                value_len -= 2;
+            }
+        }
+        splitted_values->emplace_back(data + start_offset, value_len);
+    }
+};
+
+class EncloseCsvTextFieldSplitter : public 
BaseCsvTextFieldSplitter<EncloseCsvTextFieldSplitter> {
+public:
+    explicit EncloseCsvTextFieldSplitter(
+            bool trim_tailing_space, bool trim_ends,
+            std::shared_ptr<EncloseCsvLineReaderContext> line_reader_ctx, 
size_t value_sep_len = 1,
+            char trimming_char = 0)
+            : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, 
value_sep_len, trimming_char),
+              _text_line_reader_ctx(line_reader_ctx) {}
+
+    void do_split(const Slice& line, std::vector<Slice>* splitted_values);
+
+private:
+    std::shared_ptr<EncloseCsvLineReaderContext> _text_line_reader_ctx;
+};
+
+class PlainCsvTextFieldSplitter : public 
BaseCsvTextFieldSplitter<PlainCsvTextFieldSplitter> {
+public:
+    explicit PlainCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
+                                       const std::string& value_sep, size_t 
value_sep_len = 1,
+                                       char trimming_char = 0)
+            : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, 
value_sep_len, trimming_char),
+              _value_sep(value_sep) {
+        is_single_char_delim = (value_sep_len == 1);
+    }
+
+    void do_split(const Slice& line, std::vector<Slice>* splitted_values);
+
+private:
+    void _split_field_single_char(const Slice& line, std::vector<Slice>* 
splitted_values);
+    void _split_field_multi_char(const Slice& line, std::vector<Slice>* 
splitted_values);
+
+    bool is_single_char_delim;
+    std::string _value_sep;
+};
+
 class CsvReader : public GenericReader {
     ENABLE_FACTORY_CREATOR(CsvReader);
 
@@ -89,8 +207,6 @@ private:
                               std::vector<MutableColumnPtr>& columns, size_t* 
rows);
     Status _line_split_to_values(const Slice& line, bool* success);
     void _split_line(const Slice& line);
-    void _split_line_for_single_char_delimiter(const Slice& line);
-    void _split_line_for_proto_format(const Slice& line);
     Status _check_array_format(std::vector<Slice>& split_values, bool* 
is_success);
     bool _is_null(const Slice& slice);
     bool _is_array(const Slice& slice);
@@ -145,6 +261,10 @@ private:
     std::string _value_separator;
     std::string _line_delimiter;
 
+    std::string _array_delimiter;
+    char _enclose = 0;
+    char _escape = 0;
+
     // struct, array and map delimiter
     std::string _collection_delimiter;
     // map key and value delimiter
@@ -153,11 +273,15 @@ private:
     int _value_separator_length;
     int _line_delimiter_length;
     bool _trim_double_quotes = false;
+    bool _trim_tailing_spaces = false;
+    // `should_not_trim` is to manage the case that: user do not expect to 
trim double quotes but enclose is double quotes
+    bool _not_trim_enclose = true;
 
     io::IOContext* _io_ctx;
 
     // save source text which have been splitted.
     std::vector<Slice> _split_values;
+    std::unique_ptr<LineFieldSplitterIf> _fields_splitter;
 };
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index b8fd10c021..b59bbef1f1 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -22,10 +22,11 @@
 #include <string.h>
 
 #include <algorithm>
+#include <cstddef>
+#include <cstring>
 #include <memory>
 #include <ostream>
 
-#include "common/status.h"
 #include "exec/decompressor.h"
 #include "io/fs/file_reader.h"
 #include "util/slice.h"
@@ -42,18 +43,154 @@
 
 namespace doris {
 
+const uint8_t* EncloseCsvLineReaderContext::read_line_impl(const uint8_t* 
start,
+                                                           const size_t 
length) {
+    _total_len = length;
+    size_t bound = update_reading_bound(start);
+
+    while (_idx != bound) {
+        switch (_state.curr_state) {
+        case ReaderState::START: {
+            _on_start(start, bound);
+            break;
+        }
+        case ReaderState::NORMAL: {
+            _on_normal(start, bound);
+            break;
+        }
+        case ReaderState::PRE_MATCH_ENCLOSE: {
+            _on_pre_match_enclose(start, bound);
+            break;
+        }
+        case ReaderState::MATCH_ENCLOSE: {
+            _on_match_enclose(start, bound);
+            break;
+        }
+        }
+    }
+
+    return _result;
+}
+
+void EncloseCsvLineReaderContext::on_col_sep_found(const uint8_t* start,
+                                                   const uint8_t* col_sep_pos) 
{
+    const uint8_t* field_start = start + _idx;
+    // record column separator's position
+    _column_sep_positions.push_back(col_sep_pos - start);
+    const size_t forward_distance = col_sep_pos + _column_sep_len - 
field_start;
+    _idx += forward_distance;
+}
+
+size_t EncloseCsvLineReaderContext::update_reading_bound(const uint8_t* start) 
{
+    _result = (uint8_t*)memmem(start + _idx, _total_len - _idx, 
line_delimiter.c_str(),
+                               line_delimiter_len);
+    if (_result == nullptr) {
+        return _total_len;
+    }
+    return _result - start + line_delimiter_len;
+}
+
+template <bool SingleChar>
+const uint8_t* EncloseCsvLineReaderContext::look_for_column_sep_pos(const 
uint8_t* curr_start,
+                                                                    size_t 
curr_len,
+                                                                    const 
char* column_sep,
+                                                                    size_t 
column_sep_len) {
+    const uint8_t* col_sep_pos = nullptr;
+
+    if constexpr (SingleChar) {
+        char sep = column_sep[0];
+        // note(tsy): tests show that simple `for + if` performs better than 
native memchr or memmem under normal `short feilds` case.
+        for (size_t i = 0; i < curr_len; ++i) {
+            if (curr_start[i] == sep) {
+                return curr_start + i;
+            }
+        }
+    } else {
+        // note(tsy): can be optimized, memmem has relatively large overhaed 
when used multiple times in short pattern.
+        col_sep_pos = (uint8_t*)memmem(curr_start, curr_len, column_sep, 
column_sep_len);
+    }
+    return col_sep_pos;
+}
+
+template const uint8_t* 
EncloseCsvLineReaderContext::look_for_column_sep_pos<true>(
+        const uint8_t* curr_start, size_t curr_len, const char* column_sep, 
size_t column_sep_len);
+
+template const uint8_t* 
EncloseCsvLineReaderContext::look_for_column_sep_pos<false>(
+        const uint8_t* curr_start, size_t curr_len, const char* column_sep, 
size_t column_sep_len);
+
+void EncloseCsvLineReaderContext::_on_start(const uint8_t* start, size_t& len) 
{
+    if (start[_idx] == _enclose) [[unlikely]] {
+        _state.forward_to(ReaderState::PRE_MATCH_ENCLOSE);
+        ++_idx;
+    } else {
+        _state.forward_to(ReaderState::NORMAL);
+    }
+}
+
+void EncloseCsvLineReaderContext::_on_normal(const uint8_t* start, size_t& 
len) {
+    const uint8_t* curr_start = start + _idx;
+    size_t curr_len = len - _idx;
+    const uint8_t* col_sep_pos =
+            find_col_sep_func(curr_start, curr_len, _column_sep.c_str(), 
_column_sep_len);
+
+    if (col_sep_pos != nullptr) [[likely]] {
+        on_col_sep_found(start, col_sep_pos);
+        _state.forward_to(ReaderState::START);
+        return;
+    }
+    // TODO(tsy): maybe potential bug when a multi-char is not read completely
+    _idx = len;
+}
+
+void EncloseCsvLineReaderContext::_on_pre_match_enclose(const uint8_t* start, 
size_t& len) {
+    bool should_escape = false;
+    do {
+        do {
+            if (start[_idx] == _escape) [[unlikely]] {
+                should_escape = !should_escape;
+            } else if (should_escape) [[unlikely]] {
+                should_escape = false;
+            } else if (start[_idx] == _enclose) [[unlikely]] {
+                _state.forward_to(ReaderState::MATCH_ENCLOSE);
+                ++_idx;
+                return;
+            }
+            ++_idx;
+        } while (_idx != len);
+
+        if (_idx != _total_len) {
+            len = update_reading_bound(start);
+        } else {
+            break;
+        }
+    } while (true);
+}
+
+void EncloseCsvLineReaderContext::_on_match_enclose(const uint8_t* start, 
size_t& len) {
+    const uint8_t* curr_start = start + _idx;
+    const uint8_t* delim_pos =
+            find_col_sep_func(curr_start, _column_sep_len, 
_column_sep.c_str(), _column_sep_len);
+
+    if (delim_pos != nullptr) [[likely]] {
+        on_col_sep_found(start, delim_pos);
+        _state.forward_to(ReaderState::START);
+        return;
+    }
+    // corner case(suppose `,` is delimiter and `"` is enclose): ,"part1"part2 
will be treated as incompleted enclose
+    _idx = len;
+}
+
 NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
                                                io::FileReaderSPtr file_reader,
-                                               Decompressor* decompressor, 
size_t length,
-                                               const std::string& 
line_delimiter,
-                                               size_t line_delimiter_length, 
size_t current_offset)
+                                               Decompressor* decompressor,
+                                               TextLineReaderCtxPtr 
line_reader_ctx, size_t length,
+                                               size_t current_offset)
         : _profile(profile),
           _file_reader(file_reader),
           _decompressor(decompressor),
           _min_length(length),
           _total_read_bytes(0),
-          _line_delimiter(line_delimiter),
-          _line_delimiter_length(line_delimiter_length),
+          _line_reader_ctx(line_reader_ctx),
           _input_buf(new uint8_t[INPUT_CHUNK]),
           _input_buf_size(INPUT_CHUNK),
           _input_buf_pos(0),
@@ -103,12 +240,6 @@ inline bool NewPlainTextLineReader::update_eof() {
     return _eof;
 }
 
-uint8_t* 
NewPlainTextLineReader::update_field_pos_and_find_line_delimiter(const uint8_t* 
start,
-                                                                          
size_t len) {
-    // TODO: meanwhile find and save field pos
-    return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), 
_line_delimiter_length);
-}
-
 // extend input buf if necessary only when _more_input_bytes > 0
 void NewPlainTextLineReader::extend_input_buf() {
     DCHECK(_more_input_bytes > 0);
@@ -190,13 +321,13 @@ Status NewPlainTextLineReader::read_line(const uint8_t** 
ptr, size_t* size, bool
         *eof = true;
         return Status::OK();
     }
+    _line_reader_ctx->refresh();
     int found_line_delimiter = 0;
     size_t offset = 0;
     while (!done()) {
         // find line delimiter in current decompressed data
         uint8_t* cur_ptr = _output_buf + _output_buf_pos;
-        uint8_t* pos =
-                update_field_pos_and_find_line_delimiter(cur_ptr, 
output_buf_read_remaining());
+        const uint8_t* pos = _line_reader_ctx->read_line(cur_ptr, 
output_buf_read_remaining());
 
         if (pos == nullptr) {
             // didn't find line delimiter, read more data from decompressor
@@ -327,7 +458,7 @@ Status NewPlainTextLineReader::read_line(const uint8_t** 
ptr, size_t* size, bool
             // we found a complete line
             // ready to return
             offset = pos - cur_ptr;
-            found_line_delimiter = _line_delimiter_length;
+            found_line_delimiter = _line_reader_ctx->line_delimiter_length();
             break;
         }
     } // while (!done())
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
index 659b0d0d60..7326812b92 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
@@ -16,15 +16,20 @@
 // under the License.
 
 #pragma once
-
-#include <stddef.h>
 #include <stdint.h>
 
+#include <cstddef>
+#include <cstring>
+#include <functional>
+#include <memory>
 #include <string>
+#include <utility>
+#include <vector>
 
 #include "exec/line_reader.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "util/runtime_profile.h"
+#include "util/slice.h"
 
 namespace doris {
 namespace io {
@@ -34,37 +39,174 @@ class IOContext;
 class Decompressor;
 class Status;
 
+class TextLineReaderContextIf {
+public:
+    virtual ~TextLineReaderContextIf() = default;
+
+    /// @brief find line delimiter from 'start' to 'start' + len,
+    // info about the current line may be record to the ctx, like column 
seprator pos.
+    /// @return line delimiter pos if found, otherwise return nullptr.
+    virtual const uint8_t* read_line(const uint8_t* start, const size_t len) = 
0;
+
+    /// @return length of line delimiter
+    [[nodiscard]] virtual size_t line_delimiter_length() const = 0;
+
+    /// @brief should be called when beginning to read a new line
+    virtual void refresh() = 0;
+};
+
+template <typename Ctx>
+class BaseTextLineReaderContext : public TextLineReaderContextIf {
+    // using a function ptr to decrease the overhead (found very effective 
during test).
+    using FindDelimiterFunc = const uint8_t* (*)(const uint8_t*, size_t, const 
char*, size_t);
+
+public:
+    explicit BaseTextLineReaderContext(const std::string& line_delimiter_,
+                                       const size_t line_delimiter_len_)
+            : line_delimiter(line_delimiter_), 
line_delimiter_len(line_delimiter_len_) {}
+
+    inline const uint8_t* read_line(const uint8_t* start, const size_t len) 
final {
+        return static_cast<Ctx*>(this)->read_line_impl(start, len);
+    }
+
+    [[nodiscard]] inline size_t line_delimiter_length() const final { return 
line_delimiter_len; }
+
+    inline void refresh() final { return 
static_cast<Ctx*>(this)->refresh_impl(); };
+
+protected:
+    const std::string line_delimiter;
+    const size_t line_delimiter_len;
+};
+
+class PlainTextLineReaderCtx final : public 
BaseTextLineReaderContext<PlainTextLineReaderCtx> {
+public:
+    explicit PlainTextLineReaderCtx(const std::string& line_delimiter_,
+                                    const size_t line_delimiter_len_)
+            : BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_) 
{}
+
+    inline const uint8_t* read_line_impl(const uint8_t* start, const size_t 
length) {
+        return (uint8_t*)memmem(start, length, line_delimiter.c_str(), 
line_delimiter_len);
+    }
+
+    inline void refresh_impl() {}
+};
+
+enum class ReaderState { START, NORMAL, PRE_MATCH_ENCLOSE, MATCH_ENCLOSE };
+struct ReaderStateWrapper {
+    inline void forward_to(ReaderState state) {
+        this->prev_state = this->curr_state;
+        this->curr_state = state;
+    }
+
+    inline void reset() {
+        this->curr_state = ReaderState::START;
+        this->prev_state = ReaderState::START;
+    }
+
+    inline bool operator==(const ReaderState& state) const { return curr_state 
== state; }
+
+    ReaderState curr_state = ReaderState::START;
+    ReaderState prev_state = ReaderState::START;
+};
+
+class EncloseCsvLineReaderContext final
+        : public BaseTextLineReaderContext<EncloseCsvLineReaderContext> {
+    // using a function ptr to decrease the overhead (found very effective 
during test).
+    using FindDelimiterFunc = const uint8_t* (*)(const uint8_t*, size_t, const 
char*, size_t);
+
+public:
+    explicit EncloseCsvLineReaderContext(const std::string& line_delimiter_,
+                                         const size_t line_delimiter_len_,
+                                         const std::string& column_sep_,
+                                         const size_t column_sep_len_, size_t 
col_sep_num,
+                                         const char enclose, const char escape)
+            : BaseTextLineReaderContext(line_delimiter_, line_delimiter_len_),
+              _enclose(enclose),
+              _escape(escape),
+              _column_sep_len(column_sep_len_),
+              _column_sep(column_sep_) {
+        if (column_sep_len_ == 1) {
+            find_col_sep_func = 
&EncloseCsvLineReaderContext::look_for_column_sep_pos<true>;
+        } else {
+            find_col_sep_func = 
&EncloseCsvLineReaderContext::look_for_column_sep_pos<false>;
+        }
+        _column_sep_positions.reserve(col_sep_num);
+    }
+
+    inline void refresh_impl() {
+        _idx = 0;
+        _result = nullptr;
+        _column_sep_positions.clear();
+        _state.reset();
+    }
+
+    [[nodiscard]] inline const std::vector<size_t> column_sep_positions() 
const {
+        return _column_sep_positions;
+    }
+
+    const uint8_t* read_line_impl(const uint8_t* start, size_t length);
+
+private:
+    template <bool SingleChar>
+    static const uint8_t* look_for_column_sep_pos(const uint8_t* curr_start, 
size_t curr_len,
+                                                  const char* column_sep, 
size_t column_sep_len);
+
+    size_t update_reading_bound(const uint8_t* start);
+    void on_col_sep_found(const uint8_t* curr_start, const uint8_t* 
col_sep_pos);
+
+    void _on_start(const uint8_t* start, size_t& len);
+    void _on_normal(const uint8_t* start, size_t& len);
+    void _on_pre_match_enclose(const uint8_t* start, size_t& len);
+    void _on_match_enclose(const uint8_t* start, size_t& len);
+
+    ReaderStateWrapper _state;
+    const char _enclose;
+    const char _escape;
+    const uint8_t* _result = nullptr;
+
+    size_t _total_len;
+    const size_t _column_sep_len;
+
+    size_t _idx = 0;
+
+    const std::string _column_sep;
+    std::vector<size_t> _column_sep_positions;
+
+    FindDelimiterFunc find_col_sep_func;
+};
+
+using TextLineReaderCtxPtr = std::shared_ptr<TextLineReaderContextIf>;
+
 class NewPlainTextLineReader : public LineReader {
     ENABLE_FACTORY_CREATOR(NewPlainTextLineReader);
 
 public:
     NewPlainTextLineReader(RuntimeProfile* profile, io::FileReaderSPtr 
file_reader,
-                           Decompressor* decompressor, size_t length,
-                           const std::string& line_delimiter, size_t 
line_delimiter_length,
-                           size_t current_offset);
+                           Decompressor* decompressor, TextLineReaderCtxPtr 
line_reader_ctx,
+                           size_t length, size_t current_offset);
 
     ~NewPlainTextLineReader() override;
 
     Status read_line(const uint8_t** ptr, size_t* size, bool* eof,
                      const io::IOContext* io_ctx) override;
 
+    inline TextLineReaderCtxPtr text_line_reader_ctx() { return 
_line_reader_ctx; }
+
     void close() override;
 
 private:
     bool update_eof();
 
-    size_t output_buf_read_remaining() const { return _output_buf_limit - 
_output_buf_pos; }
+    [[nodiscard]] size_t output_buf_read_remaining() const {
+        return _output_buf_limit - _output_buf_pos;
+    }
 
-    size_t input_buf_read_remaining() const { return _input_buf_limit - 
_input_buf_pos; }
+    [[nodiscard]] size_t input_buf_read_remaining() const {
+        return _input_buf_limit - _input_buf_pos;
+    }
 
     bool done() { return _file_eof && output_buf_read_remaining() == 0; }
 
-    // find line delimiter from 'start' to 'start' + len,
-    // return line delimiter pos if found, otherwise return nullptr.
-    // TODO:
-    //  save to positions of field separator
-    uint8_t* update_field_pos_and_find_line_delimiter(const uint8_t* start, 
size_t len);
-
     void extend_input_buf();
     void extend_output_buf();
 
@@ -76,8 +218,8 @@ private:
     // and only valid if the content is uncompressed
     size_t _min_length;
     size_t _total_read_bytes;
-    std::string _line_delimiter;
-    size_t _line_delimiter_length;
+
+    TextLineReaderCtxPtr _line_reader_ctx;
 
     // save the data read from file reader
     uint8_t* _input_buf;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index c7991166b8..eb5c602238 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -33,6 +33,7 @@
 
 #include <algorithm>
 #include <map>
+#include <memory>
 #include <ostream>
 #include <string_view>
 #include <utility>
@@ -405,9 +406,10 @@ Status NewJsonReader::_open_line_reader() {
     } else {
         _skip_first_line = false;
     }
-    _line_reader = NewPlainTextLineReader::create_unique(_profile, 
_file_reader, nullptr, size,
-                                                         _line_delimiter, 
_line_delimiter_length,
-                                                         _current_offset);
+    _line_reader = NewPlainTextLineReader::create_unique(
+            _profile, _file_reader, nullptr,
+            std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, 
_line_delimiter_length), size,
+            _current_offset);
     return Status::OK();
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 88044394fe..2ee09f3c52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -156,6 +156,10 @@ public class DataDescription implements 
InsertStmt.DataDesc {
 
     private boolean isAnalyzed = false;
 
+    private byte enclose = 0;
+
+    private byte escape = 0;
+
     public DataDescription(String tableName,
                            PartitionNames partitionNames,
                            List<String> filePaths,
@@ -301,6 +305,8 @@ public class DataDescription implements InsertStmt.DataDesc 
{
         this.fileFieldNames = taskInfo.getColumnExprDescs().getFileColNames();
         this.columnSeparator = taskInfo.getColumnSeparator();
         this.lineDelimiter = taskInfo.getLineDelimiter();
+        this.enclose = taskInfo.getEnclose();
+        this.escape = taskInfo.getEscape();
         getFileFormatAndCompressType(taskInfo);
         this.columnsFromPath = null;
         this.isNegative = taskInfo.getNegative();
@@ -627,6 +633,14 @@ public class DataDescription implements 
InsertStmt.DataDesc {
         this.lineDelimiter = lineDelimiter;
     }
 
+    public byte getEnclose() {
+        return enclose;
+    }
+
+    public byte getEscape() {
+        return escape;
+    }
+
     public String getSequenceCol() {
         return sequenceCol;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index 016ef13e06..91c7154be3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -108,6 +108,10 @@ public class BrokerFileGroup implements Writable {
     private boolean trimDoubleQuotes = false;
     private int skipLines;
 
+    private byte enclose;
+
+    private  byte escape;
+
     // for unit test and edit log persistence
     private BrokerFileGroup() {
     }
@@ -211,6 +215,8 @@ public class BrokerFileGroup implements Writable {
         if (lineDelimiter == null) {
             lineDelimiter = "\n";
         }
+        enclose = dataDescription.getEnclose();
+        escape = dataDescription.getEscape();
 
         fileFormat = dataDescription.getFileFormat();
         if (fileFormat != null) {
@@ -280,6 +286,14 @@ public class BrokerFileGroup implements Writable {
         return lineDelimiter;
     }
 
+    public byte getEnclose() {
+        return enclose;
+    }
+
+    public byte getEscape() {
+        return escape;
+    }
+
     public String getFileFormat() {
         return fileFormat;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index da4a2ce7f9..9355dd7250 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -255,6 +255,10 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
 
     protected boolean isTypeRead = false;
 
+    protected byte enclose = 0;
+
+    protected byte escape = 0;
+
     public void setTypeRead(boolean isTypeRead) {
         this.isTypeRead = isTypeRead;
     }
@@ -522,6 +526,14 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         return lineDelimiter;
     }
 
+    public byte getEnclose() {
+        return enclose;
+    }
+
+    public byte getEscape() {
+        return escape;
+    }
+
     public boolean isStrictMode() {
         String value = jobProperties.get(LoadStmt.STRICT_MODE);
         if (value == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
index 87946b5a2c..9089f0c2c7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/external/LoadScanProvider.java
@@ -109,6 +109,8 @@ public class LoadScanProvider {
         TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
         textParams.setColumnSeparator(fileGroup.getColumnSeparator());
         textParams.setLineDelimiter(fileGroup.getLineDelimiter());
+        textParams.setEnclose(fileGroup.getEnclose());
+        textParams.setEscape(fileGroup.getEscape());
         fileAttributes.setTextParams(textParams);
         fileAttributes.setStripOuterArray(fileGroup.isStripOuterArray());
         fileAttributes.setJsonpaths(fileGroup.getJsonPaths());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 951921fa04..4fe306d3d8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -89,6 +89,16 @@ public interface LoadTaskInfo {
 
     Separator getLineDelimiter();
 
+    /**
+     * only for csv
+     */
+    byte getEnclose();
+
+    /**
+     * only for csv
+     */
+    byte getEscape();
+
     int getSendBatchParallelism();
 
     boolean isLoadToSingleTablet();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 0ac618f2a7..222ca2dd21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -88,6 +88,10 @@ public class StreamLoadTask implements LoadTaskInfo {
     private int skipLines = 0;
     private boolean enableProfile = false;
 
+    private byte enclose = 0;
+
+    private byte escape = 0;
+
     public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, 
TFileFormatType formatType,
             TFileCompressType compressType) {
         this.id = id;
@@ -147,6 +151,24 @@ public class StreamLoadTask implements LoadTaskInfo {
         return lineDelimiter;
     }
 
+    @Override
+    public byte getEnclose() {
+        return enclose;
+    }
+
+    public void setEnclose(byte enclose) {
+        this.enclose = enclose;
+    }
+
+    @Override
+    public byte getEscape() {
+        return escape;
+    }
+
+    public void setEscape(byte escape) {
+        this.escape = escape;
+    }
+
     @Override
     public int getSendBatchParallelism() {
         return sendBatchParallelism;
@@ -315,6 +337,12 @@ public class StreamLoadTask implements LoadTaskInfo {
         if (request.isSetLineDelimiter()) {
             setLineDelimiter(request.getLineDelimiter());
         }
+        if (request.isSetEnclose()) {
+            setEnclose(request.getEnclose());
+        }
+        if (request.isSetEscape()) {
+            setEscape(request.getEscape());
+        }
         if (request.isSetHeaderType()) {
             headerType = request.getHeaderType();
         }
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 1f67f16bac..f4ac465700 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -610,6 +610,10 @@ struct TStreamLoadPutRequest {
     48: optional i64 backend_id
     49: optional i32 version // version 1 means use load_sql
     50: optional string label
+    // only valid when file type is CSV
+    51: optional i8 enclose
+    // only valid when file type is CSV
+    52: optional i8 escape
 }
 
 struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index a36368ef7a..84492bcc16 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -245,6 +245,8 @@ struct TFileTextScanRangeParams {
     2: optional string line_delimiter;
     3: optional string collection_delimiter;// array ,map ,struct delimiter 
     4: optional string mapkv_delimiter;
+    5: optional i8 enclose;
+    6: optional i8 escape;
 }
 
 struct TFileScanSlotInfo {
diff --git a/regression-test/data/load_p0/stream_load/enclose_empty_values.csv 
b/regression-test/data/load_p0/stream_load/enclose_empty_values.csv
new file mode 100644
index 0000000000..81a2e719b8
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/enclose_empty_values.csv
@@ -0,0 +1 @@
+7,"aaa",2023-07-20,,2023-07-20:05:48:31,""
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/enclose_incomplete.csv 
b/regression-test/data/load_p0/stream_load/enclose_incomplete.csv
new file mode 100644
index 0000000000..fb67aa5c84
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/enclose_incomplete.csv
@@ -0,0 +1,2 @@
+4,"abc,2023-07-20,def,2023-07-20:05:48:31,ghi
+5,bcd,2023-07-20,def,2023-07-20:05:48:31,ghi
\ No newline at end of file
diff --git 
a/regression-test/data/load_p0/stream_load/enclose_multi_char_delimiter.csv 
b/regression-test/data/load_p0/stream_load/enclose_multi_char_delimiter.csv
new file mode 100644
index 0000000000..e6e0596d7b
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/enclose_multi_char_delimiter.csv
@@ -0,0 +1 @@
+10@@"ab@@cd@@efg"@@2023-07-20@@"ab@@cd$$$efg"@@2023-07-20:05:48:31@@"@@ab$$$cd$$$"$$$10@@"ab@@cd@@efg"@@2023-07-20@@"ab@@cd$$$efg"@@2023-07-20:05:48:31@@"@@ab$$$cd$$$"
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/enclose_normal.csv 
b/regression-test/data/load_p0/stream_load/enclose_normal.csv
new file mode 100644
index 0000000000..48b7762161
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/enclose_normal.csv
@@ -0,0 +1,5 @@
+"1","abc,d
+ef,",2023-07-15,",,
+ghi,j","2023-07-20:05:48:31","kl,mn,
+"
+"2",abc,2023-07-15,def,"2023-07-20:05:48:31","ghi"
\ No newline at end of file
diff --git 
a/regression-test/data/load_p0/stream_load/enclose_not_trim_quotes.csv 
b/regression-test/data/load_p0/stream_load/enclose_not_trim_quotes.csv
new file mode 100644
index 0000000000..09dbb84568
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/enclose_not_trim_quotes.csv
@@ -0,0 +1,3 @@
+10,abc,"2023-07-15","def","2023-07-20:05:48:31","ghi"
+11,"abc,def",2023-07-15,ghi,2023-07-20:05:48:31,"jkl
+mne"
\ No newline at end of file
diff --git a/regression-test/data/load_p0/stream_load/enclose_with_escape.csv 
b/regression-test/data/load_p0/stream_load/enclose_with_escape.csv
new file mode 100644
index 0000000000..d48bada0e9
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/enclose_with_escape.csv
@@ -0,0 +1,3 @@
+3,"abc\"de,fg\"h",2023-07-15,"i
+j,k
+\"",2023-07-20:05:48:31,ghi
\ No newline at end of file
diff --git 
a/regression-test/data/load_p0/stream_load/enclose_without_escape.csv 
b/regression-test/data/load_p0/stream_load/enclose_without_escape.csv
new file mode 100644
index 0000000000..26c009c324
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/enclose_without_escape.csv
@@ -0,0 +1,2 @@
+8,"aaa"bbb"ccc",2023-07-20,""aa"bb",2023-07-20:05:48:31,"aa"bb""
+9,"aa,"bbb",cc",2023-07-20,""aa"bb,",2023-07-20:05:48:31,"aa"bb""
\ No newline at end of file
diff --git 
a/regression-test/data/load_p0/stream_load/enclose_wrong_position.csv 
b/regression-test/data/load_p0/stream_load/enclose_wrong_position.csv
new file mode 100644
index 0000000000..a5189f3468
--- /dev/null
+++ b/regression-test/data/load_p0/stream_load/enclose_wrong_position.csv
@@ -0,0 +1 @@
+6,ab"c,2023-07-20,d"ef",2023-07-20:05:48:31,"g"hi
\ No newline at end of file
diff --git 
a/regression-test/data/load_p0/stream_load/test_csv_with_enclose_and_escape.out 
b/regression-test/data/load_p0/stream_load/test_csv_with_enclose_and_escape.out
new file mode 100644
index 0000000000..18b4968b7c
--- /dev/null
+++ 
b/regression-test/data/load_p0/stream_load/test_csv_with_enclose_and_escape.out
@@ -0,0 +1,12 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+1      abc,d\nef,      2023-07-15      ,,\nghi,j       2023-07-20T05:48:31     
kl,mn,\n
+2      abc     2023-07-15      def     2023-07-20T05:48:31     ghi
+3      abc"de,fg"h     2023-07-15      i\nj,k\n"       2023-07-20T05:48:31     
ghi
+6      ab"c    2023-07-20      d"ef"   2023-07-20T05:48:31     "g"hi
+7      aaa     2023-07-20              2023-07-20T05:48:31     
+10     ab@@cd@@efg     2023-07-20      ab@@cd$$$efg    2023-07-20T05:48:31     
@@ab$$$cd$$$
+10     ab@@cd@@efg     2023-07-20      ab@@cd$$$efg    2023-07-20T05:48:31     
@@ab$$$cd$$$
+10     abc     \N      "def"   \N      "ghi"
+11     "abc,def"       2023-07-15      ghi     2023-07-20T05:48:31     
"jkl\nmne"
+
diff --git 
a/regression-test/data/load_p0/stream_load/test_txt_special_delimiter.csv 
b/regression-test/data/load_p0/stream_load/test_txt_special_delimiter.csv
index c2f8816941..b1ab991bc3 100644
--- a/regression-test/data/load_p0/stream_load/test_txt_special_delimiter.csv
+++ b/regression-test/data/load_p0/stream_load/test_txt_special_delimiter.csv
@@ -1 +1 @@
-101030204165996924763600000104020301101030204165996924763600000104020301
\ No newline at end of file
+101030204165996924763600000114020301101030204165996924763600000114020301
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_csv_with_enclose_and_escape.groovy
 
b/regression-test/suites/load_p0/stream_load/test_csv_with_enclose_and_escape.groovy
new file mode 100644
index 0000000000..1073885eed
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_csv_with_enclose_and_escape.groovy
@@ -0,0 +1,116 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_csv_with_enclose_and_escape", "p0") {
+
+    def tableName = "test_csv_with_enclose_and_escape"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(20) NULL,
+            `k2` string NULL,
+            `v1` date  NULL,
+            `v2` string  NULL,
+            `v3` datetime  NULL,
+            `v4` string  NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+    def normalCases = [
+            'enclose_normal.csv',
+            'enclose_with_escape.csv',
+            'enclose_wrong_position.csv',
+            'enclose_empty_values.csv'
+    ]
+
+    for (i in 0..<normalCases.size()) {
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', ','
+            set 'trim_double_quotes', 'true'
+            set 'enclose', "\""
+            set 'escape', '\\'
+
+            file "${normalCases[i]}"
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'trim_double_quotes', 'true'
+        set 'enclose', "\""
+        set 'escape', '\\'
+        set 'max_filter_ratio', '0.5'
+
+        file "enclose_incomplete.csv"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'trim_double_quotes', 'true'
+        set 'enclose', "\""
+        set 'escape', '\\'
+
+        file "enclose_without_escape.csv"
+
+        check {
+            result, exception, startTime, endTime ->
+                assertTrue(exception == null)
+                def json = parseJson(result)
+                assertEquals("Fail", json.Status)
+        }
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', '@@'
+        set 'line_delimiter', '$$$'
+        set 'trim_double_quotes', 'true'
+        set 'enclose', "\""
+        set 'escape', '\\'
+
+        file "enclose_multi_char_delimiter.csv"
+    }
+
+    streamLoad {
+        table "${tableName}"
+        set 'column_separator', ','
+        set 'trim_double_quotes', 'false'
+        set 'enclose', "\""
+        set 'escape', '\\'
+
+        file "enclose_not_trim_quotes.csv"
+    }
+
+    qt_select """
+        SELECT * FROM ${tableName} ORDER BY k1, k2 
+    """
+}
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy 
b/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
index 0e762f31d3..a105f833f7 100644
--- 
a/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
+++ 
b/regression-test/suites/load_p0/stream_load/test_txt_special_delimiter.groovy
@@ -35,8 +35,9 @@ suite("test_txt_special_delimiter", "p0") {
         streamLoad {
             table "${tableName}"
 
+            // should not start with the same prefix
             set 'column_separator', '01030204'
-            set 'line_delimiter', '04020301'
+            set 'line_delimiter', '14020301'
             set 'columns', 'k1, k2'
             set 'strict_mode', 'true'
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to