This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 279ae1c  Add fuzzy_parse option to speed up json import (#5114)
279ae1c is described below

commit 279ae1cb756c5d8e39ee129af7374f578008ce38
Author: Zhengguo Yang <[email protected]>
AuthorDate: Fri Dec 25 09:19:42 2020 +0800

    Add fuzzy_parse option to speed up json import (#5114)
    
    add a flag of fuzzy_parse, if the json file all object keys are the same 
and has same order, we only need to parse the first row, and then use index 
instead key to parse value
---
 be/CMakeLists.txt                                  |  6 ++
 be/src/exec/json_scanner.cpp                       | 65 +++++++++++++++-------
 be/src/exec/json_scanner.h                         |  9 ++-
 be/src/http/action/mini_load.cpp                   | 10 ++++
 be/src/http/action/stream_load.cpp                 | 13 +++++
 be/src/http/http_common.h                          |  2 +
 .../Data Manipulation/STREAM LOAD.md               |  2 +
 .../Data Manipulation/STREAM LOAD.md               |  2 +
 .../doris/analysis/AlterRoutineLoadStmt.java       |  7 +++
 .../doris/analysis/CreateRoutineLoadStmt.java      |  8 +++
 .../org/apache/doris/analysis/DataDescription.java |  9 +++
 .../java/org/apache/doris/analysis/LoadStmt.java   |  1 +
 .../org/apache/doris/httpv2/rest/UploadAction.java |  2 +
 .../org/apache/doris/load/BrokerFileGroup.java     | 10 ++++
 .../doris/load/routineload/RoutineLoadJob.java     | 11 ++++
 .../org/apache/doris/planner/BrokerScanNode.java   |  2 +
 .../apache/doris/planner/StreamLoadScanNode.java   |  1 +
 .../java/org/apache/doris/qe/MultiLoadMgr.java     |  4 ++
 .../java/org/apache/doris/task/LoadTaskInfo.java   |  1 +
 .../java/org/apache/doris/task/StreamLoadTask.java | 12 ++++
 gensrc/thrift/FrontendService.thrift               |  1 +
 gensrc/thrift/PlanNodes.thrift                     |  1 +
 run-be-ut.sh                                       | 10 +++-
 23 files changed, 163 insertions(+), 26 deletions(-)

diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index f0a86ed..f90d293 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -345,6 +345,12 @@ SET(CMAKE_CXX_FLAGS "${CXX_COMMON_FLAGS} 
${CMAKE_CXX_FLAGS}")
 
 message(STATUS "Compiler Flags: ${CMAKE_CXX_FLAGS}")
 
+if (CMAKE_GENERATOR STREQUAL "Ninja" AND NOT DISABLE_COLORED_BUILD)
+    # Turn on colored output. https://github.com/ninja-build/ninja/wiki/FAQ
+    set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
+    set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fdiagnostics-color=always")
+endif ()
+
 # Thrift requires these two definitions for some types that we use
 add_definitions(-DHAVE_INTTYPES_H -DHAVE_NETINET_IN_H)
 
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index a07bf8a..4433f17 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -139,6 +139,7 @@ Status JsonScanner::open_next_reader() {
     std::string jsonpath = "";
     bool strip_outer_array = false;
     bool num_as_string = false;
+    bool fuzzy_parse = false;
 
     if (range.__isset.jsonpaths) {
         jsonpath = range.jsonpaths;
@@ -152,8 +153,11 @@ Status JsonScanner::open_next_reader() {
     if (range.__isset.num_as_string) {
         num_as_string = range.num_as_string;
     }
-    _cur_file_reader =
-            new JsonReader(_state, _counter, _profile, file, 
strip_outer_array, num_as_string);
+    if (range.__isset.fuzzy_parse) {
+        fuzzy_parse = range.fuzzy_parse;
+    }
+    _cur_file_reader = new JsonReader(_state, _counter, _profile, file, 
strip_outer_array,
+                                      num_as_string, fuzzy_parse);
     RETURN_IF_ERROR(_cur_file_reader->init(jsonpath, json_root));
 
     return Status::OK();
@@ -185,7 +189,8 @@ rapidjson::Value::ConstValueIterator 
JsonDataInternal::get_next() {
 
 ////// class JsonReader
 JsonReader::JsonReader(RuntimeState* state, ScannerCounter* counter, 
RuntimeProfile* profile,
-                       FileReader* file_reader, bool strip_outer_array, bool 
num_as_string)
+                       FileReader* file_reader, bool strip_outer_array, bool 
num_as_string,
+                       bool fuzzy_parse)
         : _handle_json_callback(nullptr),
           _next_line(0),
           _total_lines(0),
@@ -196,6 +201,7 @@ JsonReader::JsonReader(RuntimeState* state, ScannerCounter* 
counter, RuntimeProf
           _closed(false),
           _strip_outer_array(strip_outer_array),
           _num_as_string(num_as_string),
+          _fuzzy_parse(fuzzy_parse),
           _json_doc(nullptr) {
     _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
     _read_timer = ADD_TIMER(_profile, "ReadTime");
@@ -272,7 +278,7 @@ void JsonReader::_close() {
 Status JsonReader::_parse_json_doc(bool* eof) {
     // read a whole message, must be delete json_str by `delete[]`
     SCOPED_TIMER(_file_read_timer);
-    std::unique_ptr<uint8_t[]> json_str; 
+    std::unique_ptr<uint8_t[]> json_str;
     size_t length = 0;
     RETURN_IF_ERROR(_file_reader->read_one_message(&json_str, &length));
     _bytes_read_counter += length;
@@ -300,7 +306,8 @@ Status JsonReader::_parse_json_doc(bool* eof) {
         str_error << "Parse json data for JsonDoc failed. code = "
                   << _origin_json_doc.GetParseError() << ", error-info:"
                   << 
rapidjson::GetParseError_En(_origin_json_doc.GetParseError());
-        _state->append_error_msg_to_file(std::string((char*)json_str.get(), 
length), str_error.str());
+        _state->append_error_msg_to_file(std::string((char*)json_str.get(), 
length),
+                                         str_error.str());
         _counter->num_rows_filtered++;
         return Status::DataQualityError(str_error.str());
     }
@@ -427,9 +434,7 @@ void 
JsonReader::_write_data_to_tuple(rapidjson::Value::ConstValueIterator value
 // for simple format json
 void JsonReader::_set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple,
                                   const std::vector<SlotDescriptor*>& 
slot_descs,
-                                  const std::vector<rapidjson::Value>& 
value_key,
                                   MemPool* tuple_pool, bool* valid) {
-    DCHECK(slot_descs.size() == value_key.size());
     if (!objectValue.IsObject()) {
         // Here we expect the incoming `objectValue` to be a Json Object, such 
as {"key" : "value"},
         // not other type of Json format.
@@ -441,21 +446,30 @@ void JsonReader::_set_tuple_value(rapidjson::Value& 
objectValue, Tuple* tuple,
     }
 
     int nullcount = 0;
-    for (int i = 0; i < slot_descs.size(); ++i) {
-        rapidjson::Value::ConstMemberIterator it = 
objectValue.FindMember(value_key[i]);
+    for (auto v : slot_descs) {
+        rapidjson::Value::ConstMemberIterator it = objectValue.MemberEnd();
+        if (_fuzzy_parse) {
+            auto idx_it = _name_map.find(v->col_name());
+            if (idx_it != _name_map.end() && idx_it->second < 
objectValue.MemberCount()) {
+                it = objectValue.MemberBegin() + idx_it->second;
+            }
+        } else {
+            it = objectValue.FindMember(
+                    rapidjson::Value(v->col_name().c_str(), 
v->col_name().size()));
+        }
         if (it != objectValue.MemberEnd()) {
             const rapidjson::Value& value = it->value;
-            _write_data_to_tuple(&value, slot_descs[i], tuple, tuple_pool, 
valid);
+            _write_data_to_tuple(&value, v, tuple, tuple_pool, valid);
             if (!(*valid)) {
                 return;
             }
         } else { // not found
-            if (slot_descs[i]->is_nullable()) {
-                tuple->set_null(slot_descs[i]->null_indicator_offset());
+            if (v->is_nullable()) {
+                tuple->set_null(v->null_indicator_offset());
                 nullcount++;
             } else {
                 std::stringstream str_error;
-                str_error << "The column `" << slot_descs[i]->col_name()
+                str_error << "The column `" << v->col_name()
                           << "` is not nullable, but it's not found in 
jsondata.";
                 
_state->append_error_msg_to_file(_print_json_value(objectValue), 
str_error.str());
                 _counter->num_rows_filtered++;
@@ -486,11 +500,6 @@ void JsonReader::_set_tuple_value(rapidjson::Value& 
objectValue, Tuple* tuple,
  */
 Status JsonReader::_handle_simple_json(Tuple* tuple, const 
std::vector<SlotDescriptor*>& slot_descs,
                                        MemPool* tuple_pool, bool* eof) {
-    // If you use a string as the key to find the json object, strlen will be 
called every time, so the key is constructed in advance
-    std::vector<rapidjson::Value> value_key;
-    for (auto v : slot_descs) {
-        value_key.emplace_back(v->col_name().c_str(), v->col_name().size());
-    }
     do {
         bool valid = false;
         if (_next_line >= _total_lines) { // parse json and generic document
@@ -502,6 +511,8 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, const 
std::vector<SlotDescr
             if (*eof) {          // read all data, then return
                 return Status::OK();
             }
+            _name_map.clear();
+            rapidjson::Value* objectValue = nullptr;
             if (_json_doc->IsArray()) {
                 _total_lines = _json_doc->Size();
                 if (_total_lines == 0) {
@@ -513,18 +524,30 @@ Status JsonReader::_handle_simple_json(Tuple* tuple, 
const std::vector<SlotDescr
                     _counter->num_rows_filtered++;
                     continue;
                 }
+                objectValue = &(*_json_doc)[0];
             } else {
                 _total_lines = 1; // only one row
+                objectValue = _json_doc;
             }
-
             _next_line = 0;
+            if (_fuzzy_parse) {
+                for (auto v : slot_descs) {
+                    for (int i = 0; i < objectValue->MemberCount(); ++i) {
+                        auto it = objectValue->MemberBegin() + i;
+                        if (v->col_name() == it->name.GetString()) {
+                            _name_map[v->col_name()] = i;
+                            break;
+                        }
+                    }
+                }
+            }
         }
 
         if (_json_doc->IsArray()) {                                   // 
handle case 1
             rapidjson::Value& objectValue = (*_json_doc)[_next_line]; // json 
object
-            _set_tuple_value(objectValue, tuple, slot_descs, value_key, 
tuple_pool, &valid);
+            _set_tuple_value(objectValue, tuple, slot_descs, tuple_pool, 
&valid);
         } else { // handle case 2
-            _set_tuple_value(*_json_doc, tuple, slot_descs, value_key, 
tuple_pool, &valid);
+            _set_tuple_value(*_json_doc, tuple, slot_descs, tuple_pool, 
&valid);
         }
         _next_line++;
         if (!valid) {
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index 5975978..86bc2dd 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -27,6 +27,7 @@
 #include <memory>
 #include <sstream>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "common/status.h"
@@ -104,7 +105,8 @@ struct JsonPath;
 class JsonReader {
 public:
     JsonReader(RuntimeState* state, ScannerCounter* counter, RuntimeProfile* 
profile,
-               FileReader* file_reader, bool strip_outer_array, bool 
num_as_string);
+               FileReader* file_reader, bool strip_outer_array, bool 
num_as_string,
+               bool fuzzy_parse);
 
     ~JsonReader();
 
@@ -129,8 +131,7 @@ private:
                     const uint8_t* value, int32_t len);
     Status _parse_json_doc(bool* eof);
     void _set_tuple_value(rapidjson::Value& objectValue, Tuple* tuple,
-                          const std::vector<SlotDescriptor*>& slot_descs,
-                          const std::vector<rapidjson::Value>& value_key, 
MemPool* tuple_pool,
+                          const std::vector<SlotDescriptor*>& slot_descs, 
MemPool* tuple_pool,
                           bool* valid);
     void _write_data_to_tuple(rapidjson::Value::ConstValueIterator value, 
SlotDescriptor* desc,
                               Tuple* tuple, MemPool* tuple_pool, bool* valid);
@@ -153,6 +154,7 @@ private:
     bool _closed;
     bool _strip_outer_array;
     bool _num_as_string;
+    bool _fuzzy_parse;
     RuntimeProfile::Counter* _bytes_read_counter;
     RuntimeProfile::Counter* _read_timer;
     RuntimeProfile::Counter* _file_read_timer;
@@ -162,6 +164,7 @@ private:
 
     rapidjson::Document _origin_json_doc; // origin json document object from 
parsed json string
     rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not 
set `json_root`
+    std::unordered_map<std::string, int> _name_map;
 };
 
 } // namespace doris
diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp
index 0a185ce..879bc34 100644
--- a/be/src/http/action/mini_load.cpp
+++ b/be/src/http/action/mini_load.cpp
@@ -305,6 +305,15 @@ Status MiniLoadAction::_merge_header(HttpRequest* http_req,
     } else {
         (*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
     }
+    if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
+        if (boost::iequals(http_req->header(HTTP_FUZZY_PARSE), "true")) {
+            (*params)[HTTP_FUZZY_PARSE] = "true";
+        } else {
+            (*params)[HTTP_FUZZY_PARSE] = "false";
+        }
+    } else {
+        (*params)[HTTP_FUZZY_PARSE] = "false";
+    }
     if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + 
HTTP_SEQUENCE_COL).empty()) {
         (*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] =
                 http_req->header(HTTP_FUNCTION_COLUMN + "." + 
HTTP_SEQUENCE_COL);
@@ -923,6 +932,7 @@ void MiniLoadAction::_new_handle(HttpRequest* req) {
     }
 
     std::string str = ctx->to_json_for_mini_load();
+    str += '\n';
     HttpChannel::send_reply(req, str);
 }
 
diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index c9abc73..1c9b7c3 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -128,6 +128,8 @@ void StreamLoadAction::handle(HttpRequest* req) {
     }
 
     auto str = ctx->to_json();
+    // add new line at end
+    str = str + '\n';
     HttpChannel::send_reply(req, str);
 
     // update statstics
@@ -195,6 +197,8 @@ int StreamLoadAction::on_header(HttpRequest* req) {
             ctx->body_sink->cancel();
         }
         auto str = ctx->to_json();
+        // add new line at end
+        str = str + '\n';
         HttpChannel::send_reply(req, str);
         streaming_load_current_processing->increment(-1);
         return -1;
@@ -408,6 +412,15 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req, StreamLoadContext*
     } else {
         request.__set_num_as_string(false);
     }
+    if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
+        if (boost::iequals(http_req->header(HTTP_FUZZY_PARSE), "true")) {
+            request.__set_fuzzy_parse(true);
+        } else {
+            request.__set_fuzzy_parse(false);
+        }
+    } else {
+        request.__set_fuzzy_parse(false);
+    }
     if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + 
HTTP_SEQUENCE_COL).empty()) {
         request.__set_sequence_col(
                 http_req->header(HTTP_FUNCTION_COLUMN + "." + 
HTTP_SEQUENCE_COL));
diff --git a/be/src/http/http_common.h b/be/src/http/http_common.h
index f6a89e2..5681db6 100644
--- a/be/src/http/http_common.h
+++ b/be/src/http/http_common.h
@@ -40,6 +40,8 @@ static const std::string HTTP_JSONPATHS = "jsonpaths";
 static const std::string HTTP_JSONROOT = "json_root";
 static const std::string HTTP_STRIP_OUTER_ARRAY = "strip_outer_array";
 static const std::string HTTP_NUM_AS_STRING = "num_as_string";
+static const std::string HTTP_FUZZY_PARSE = "fuzzy_parse";
+
 static const std::string HTTP_MERGE_TYPE = "merge_type";
 static const std::string HTTP_DELETE_CONDITION = "delete";
 static const std::string HTTP_FUNCTION_COLUMN = "function_column";
diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md
index 39d587b..5440c7f 100644
--- a/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md     
+++ b/docs/en/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md     
@@ -127,6 +127,8 @@ json_root is a valid JSONPATH string that specifies the 
root node of the JSON Do
 
 The type of data merging supports three types: APPEND, DELETE, and MERGE. 
APPEND is the default value, which means that all this batch of data needs to 
be appended to the existing data. DELETE means to delete all rows with the same 
key as this batch of data. MERGE semantics Need to be used in conjunction with 
the delete condition, which means that the data that meets the delete condition 
is processed according to DELETE semantics and the rest is processed according 
to APPEND semantics
 
+`fuzzy_parse`  Boolean type, true to indicate that parse json schema as the 
first line, this can make import more faster,but need all key keep the order of 
first line, default value is false. Only use for json format.
+
 RETURN VALUES
 
 After the load is completed, the related content of this load will be returned 
in Json format. Current field included
diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM 
LOAD.md
index 82ebd44..4c4ecc0 100644
--- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md  
+++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/STREAM LOAD.md  
@@ -96,6 +96,8 @@ under the License.
         
         function_column.sequence_col: 
只适用于UNIQUE_KEYS,相同key列下,保证value列按照source_sequence列进行REPLACE, 
             source_sequence可以是数据源中的列,也可以是表结构中的一列。
+        
+        fuzzy_parse: 布尔类型,为true表示json将以第一行为schema 进行解析,开启这个选项可以提高json 
导入效率,但是要求要求所有json 对象的key的顺序和第一行一致, 默认为false,仅用于json 格式
 
     RETURN VALUES
         导入完成后,会以Json格式返回这次导入的相关内容。当前包括一下字段
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
index 4f16572..69ca520 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java
@@ -52,6 +52,7 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             .add(CreateRoutineLoadStmt.JSONROOT)
             .add(CreateRoutineLoadStmt.STRIP_OUTER_ARRAY)
             .add(CreateRoutineLoadStmt.NUM_AS_STRING)
+            .add(CreateRoutineLoadStmt.FUZZY_PARSE)
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
             .build();
@@ -188,6 +189,12 @@ public class AlterRoutineLoadStmt extends DdlStmt {
             
analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.NUM_AS_STRING),
                     String.valueOf(numAsString));
         }
+
+        if (jobProperties.containsKey(CreateRoutineLoadStmt.FUZZY_PARSE)) {
+            boolean fuzzyParse = 
Boolean.valueOf(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE));
+            
analyzedJobProperties.put(jobProperties.get(CreateRoutineLoadStmt.FUZZY_PARSE),
+                String.valueOf(fuzzyParse));
+        }
     }
 
     private void checkDataSourceProperties() throws AnalysisException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index 7b1c0db..ee629be 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -102,6 +102,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     public static final String JSONPATHS = "jsonpaths";
     public static final String JSONROOT = "json_root";
     public static final String NUM_AS_STRING = "num_as_string";
+    public static final String FUZZY_PARSE = "fuzzy_parse";
 
     // kafka type properties
     public static final String KAFKA_BROKER_LIST_PROPERTY = 
"kafka_broker_list";
@@ -124,6 +125,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
             .add(JSONPATHS)
             .add(STRIP_OUTER_ARRAY)
             .add(NUM_AS_STRING)
+            .add(FUZZY_PARSE)
             .add(JSONROOT)
             .add(LoadStmt.STRICT_MODE)
             .add(LoadStmt.TIMEZONE)
@@ -168,6 +170,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
     private String jsonRoot   = ""; // MUST be a jsonpath string
     private boolean stripOuterArray = false;
     private boolean numAsString = false;
+    private boolean fuzzyParse = false;
 
     // kafka related properties
     private String kafkaBrokerList;
@@ -262,6 +265,10 @@ public class CreateRoutineLoadStmt extends DdlStmt {
         return numAsString;
     }
 
+    public boolean isFuzzyParse() {
+        return fuzzyParse;
+    }
+
     public String getJsonPaths() {
         return jsonPaths;
     }
@@ -439,6 +446,7 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                 jsonRoot = jobProperties.get(JSONROOT);
                 stripOuterArray = 
Boolean.valueOf(jobProperties.getOrDefault(STRIP_OUTER_ARRAY, "false"));
                 numAsString = 
Boolean.valueOf(jobProperties.getOrDefault(NUM_AS_STRING, "false"));
+                fuzzyParse = 
Boolean.valueOf(jobProperties.getOrDefault(FUZZY_PARSE, "false"));
             } else {
                 throw new UserException("Format type is invalid. format=`" + 
format + "`");
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
index 64cfd76..577aa99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java
@@ -116,6 +116,7 @@ public class DataDescription {
     private boolean stripOuterArray = false;
     private String jsonPaths = "";
     private String jsonRoot = "";
+    private boolean fuzzyParse = false;
 
     private String sequenceCol;
 
@@ -477,6 +478,14 @@ public class DataDescription {
         this.stripOuterArray = stripOuterArray;
     }
 
+    public boolean isFuzzyParse() {
+        return fuzzyParse;
+    }
+
+    public void setFuzzyParse(boolean fuzzyParse) {
+        this.fuzzyParse = fuzzyParse;
+    }
+
     public String getJsonPaths() {
         return jsonPaths;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
index 8cd6013..cfca472 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -107,6 +107,7 @@ public class LoadStmt extends DdlStmt {
     public static final String KEY_IN_PARAM_JSONPATHS  = "jsonpaths";
     public static final String KEY_IN_PARAM_JSONROOT  = "json_root";
     public static final String KEY_IN_PARAM_STRIP_OUTER_ARRAY = 
"strip_outer_array";
+    public static final String KEY_IN_PARAM_FUZZY_PARSE = "fuzzy_parse";
     public static final String KEY_IN_PARAM_MERGE_TYPE = "merge_type";
     public static final String KEY_IN_PARAM_DELETE_CONDITION = "delete";
     public static final String KEY_IN_PARAM_FUNCTION_COLUMN = 
"function_column";
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java
index 0a09c5b..61b7f80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/UploadAction.java
@@ -271,6 +271,7 @@ public class UploadAction extends RestBaseController {
         public String stripOuterArray;
         public String jsonRoot;
         public String numAsString;
+        public String fuzzyParse;
 
 
         public LoadContext(HttpServletRequest request, String db, String tbl, 
String user, String passwd, TmpFileMgr.TmpFile file) {
@@ -302,6 +303,7 @@ public class UploadAction extends RestBaseController {
             this.stripOuterArray = request.getHeader("strip_outer_array");
             this.numAsString = request.getHeader("num_as_string");
             this.jsonRoot = request.getHeader("json_root");
+            this.fuzzyParse = request.getHeader("fuzzy_parse");
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
index fc7df6b..6278ff8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerFileGroup.java
@@ -99,6 +99,7 @@ public class BrokerFileGroup implements Writable {
     private boolean stripOuterArray = false;
     private String jsonPaths = "";
     private String jsonRoot = "";
+    private boolean fuzzyParse = true;
 
     // for unit test and edit log persistence
     private BrokerFileGroup() {
@@ -227,6 +228,7 @@ public class BrokerFileGroup implements Writable {
             stripOuterArray = dataDescription.isStripOuterArray();
             jsonPaths = dataDescription.getJsonPaths();
             jsonRoot = dataDescription.getJsonRoot();
+            fuzzyParse = dataDescription.isFuzzyParse();
         }
     }
 
@@ -326,6 +328,14 @@ public class BrokerFileGroup implements Writable {
         this.stripOuterArray = stripOuterArray;
     }
 
+    public boolean isFuzzyParse() {
+        return fuzzyParse;
+    }
+
+    public void setFuzzyParse(boolean fuzzyParse) {
+        this.fuzzyParse = fuzzyParse;
+    }
+
     public String getJsonPaths() {
         return jsonPaths;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index f1d858b..3824df9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -198,6 +198,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
     private static final String PROPS_NUM_AS_STRING = "num_as_string";
     private static final String PROPS_JSONPATHS = "jsonpaths";
     private static final String PROPS_JSONROOT = "json_root";
+    private static final String PROPS_FUZZY_PARSE = "fuzzy_parse";
 
     protected int currentTaskConcurrentNum;
     protected RoutineLoadProgress progress;
@@ -308,6 +309,7 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             jobProperties.put(PROPS_NUM_AS_STRING, "false");
             jobProperties.put(PROPS_JSONPATHS, "");
             jobProperties.put(PROPS_JSONROOT, "");
+            jobProperties.put(PROPS_FUZZY_PARSE, "false");
         } else if (stmt.getFormat().equals("json")) {
             jobProperties.put(PROPS_FORMAT, "json");
             if (!Strings.isNullOrEmpty(stmt.getJsonPaths())) {
@@ -330,6 +332,11 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
             } else {
                 jobProperties.put(PROPS_NUM_AS_STRING, "false");
             }
+            if (stmt.isFuzzyParse()) {
+                jobProperties.put(PROPS_FUZZY_PARSE, "true");
+            } else {
+                jobProperties.put(PROPS_FUZZY_PARSE, "false");
+            }
 
         } else {
             throw new UserException("Invalid format type.");
@@ -560,6 +567,10 @@ public abstract class RoutineLoadJob extends 
AbstractTxnStateChangeCallback impl
         return Boolean.valueOf(jobProperties.get(PROPS_NUM_AS_STRING));
     }
 
+    public boolean isFuzzyParse() {
+        return Boolean.valueOf(jobProperties.get(PROPS_FUZZY_PARSE));
+    }
+
     @Override
     public String getPath() {
         return null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index e7dd2fe..5923d9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -427,6 +427,7 @@ public class BrokerScanNode extends LoadScanNode {
                         
rangeDesc.setStripOuterArray(context.fileGroup.isStripOuterArray());
                         
rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths());
                         rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot());
+                        
rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse());
                     }
                     brokerScanRange(curLocations).addToRanges(rangeDesc);
                     curFileOffset += rangeBytes;
@@ -451,6 +452,7 @@ public class BrokerScanNode extends LoadScanNode {
                     
rangeDesc.setStripOuterArray(context.fileGroup.isStripOuterArray());
                     rangeDesc.setJsonpaths(context.fileGroup.getJsonPaths());
                     rangeDesc.setJsonRoot(context.fileGroup.getJsonRoot());
+                    rangeDesc.setFuzzyParse(context.fileGroup.isFuzzyParse());
                 }
                 brokerScanRange(curLocations).addToRanges(rangeDesc);
                 curFileOffset = 0;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
index 4b8a7ef..b31578b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java
@@ -94,6 +94,7 @@ public class StreamLoadScanNode extends LoadScanNode {
             }
             rangeDesc.setStripOuterArray(taskInfo.isStripOuterArray());
             rangeDesc.setNumAsString(taskInfo.isNumAsString());
+            rangeDesc.setFuzzyParse(taskInfo.isFuzzyParse());
         }
         rangeDesc.splittable = false;
         switch (taskInfo.getFileType()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
index 9d2d853..24fc3e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java
@@ -470,6 +470,7 @@ public class MultiLoadMgr {
             boolean stripOuterArray = false;
             String jsonPaths = "";
             String jsonRoot = "";
+            boolean fuzzyParse = false;
             if (properties != null) {
                 colString = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
                 String columnSeparatorStr = 
properties.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR);
@@ -502,6 +503,8 @@ public class MultiLoadMgr {
                             
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_STRIP_OUTER_ARRAY, "false"));
                     jsonPaths = 
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONPATHS, "");
                     jsonRoot = 
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONROOT, "");
+                    fuzzyParse = Boolean.valueOf(
+                        
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false"));
                 }
             }
             DataDescription dataDescription = new DataDescription(tbl, 
partitionNames, files, null, columnSeparator,
@@ -518,6 +521,7 @@ public class MultiLoadMgr {
             dataDescription.setJsonPaths(jsonPaths);
             dataDescription.setJsonRoot(jsonRoot);
             dataDescription.setStripOuterArray(stripOuterArray);
+            dataDescription.setFuzzyParse(fuzzyParse);
             return dataDescription;
         }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 7692d31..ec335e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -42,6 +42,7 @@ public interface LoadTaskInfo {
     public String getJsonPaths();
     public String getJsonRoot();
     public boolean isStripOuterArray();
+    public boolean isFuzzyParse();
     public boolean isNumAsString();
     public String getPath();
     public List<ImportColumnDesc> getColumnExprDescs();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 2baf1b1..7ccdd46 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -60,6 +60,7 @@ public class StreamLoadTask implements LoadTaskInfo {
     private boolean numAsString;
     private String jsonPaths;
     private String jsonRoot;
+    private boolean fuzzyParse;
 
     // optional
     private List<ImportColumnDesc> columnExprDescs = Lists.newArrayList();
@@ -85,6 +86,7 @@ public class StreamLoadTask implements LoadTaskInfo {
         this.jsonRoot = "";
         this.stripOuterArray = false;
         this.numAsString = false;
+        this.fuzzyParse = false;
     }
 
     public TUniqueId getId() {
@@ -148,6 +150,15 @@ public class StreamLoadTask implements LoadTaskInfo {
         return numAsString;
     }
 
+    @Override
+    public boolean isFuzzyParse() {
+        return fuzzyParse;
+    }
+
+    public void setFuzzyParse(boolean fuzzyParse) {
+        this.fuzzyParse = fuzzyParse;
+    }
+
     public void setStripOuterArray(boolean stripOuterArray) {
         this.stripOuterArray = stripOuterArray;
     }
@@ -239,6 +250,7 @@ public class StreamLoadTask implements LoadTaskInfo {
             }
             stripOuterArray = request.isStripOuterArray();
             numAsString = request.isNumAsString();
+            fuzzyParse = request.isFuzzyParse();
         }
         if (request.isSetMergeType()) {
             try {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 35182f5..b10fbe7 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -575,6 +575,7 @@ struct TStreamLoadPutRequest {
     28: optional string delete_condition
     29: optional string sequence_col
     30: optional bool num_as_string
+    31: optional bool fuzzy_parse
 }
 
 struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index ffc9210..1f83574 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -133,6 +133,7 @@ struct TBrokerRangeDesc {
     13: optional string json_root;
     //  it's usefull when format_type == FORMAT_JSON
     14: optional bool num_as_string;
+    15: optional bool fuzzy_parse;
 }
 
 struct TBrokerScanRangeParams {
diff --git a/run-be-ut.sh b/run-be-ut.sh
index dafd484..f9c205e 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -107,9 +107,15 @@ if [ ! -d ${CMAKE_BUILD_DIR} ]; then
 fi
 
 cd ${CMAKE_BUILD_DIR}
+GENERATOR="Unix Makefiles"
+BUILD_SYSTEM="make"
+if ninja --version 2>/dev/null; then
+    GENERATOR="Ninja"
+    BUILD_SYSTEM="ninja"
+fi
 
-${CMAKE_CMD} ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON 
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-make -j${PARALLEL}
+${CMAKE_CMD} -G "${GENERATOR}" ../ -DWITH_MYSQL=OFF -DMAKE_TEST=ON 
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
+${BUILD_SYSTEM} -j${PARALLEL}
 
 if [ ${RUN} -ne 1 ]; then
     echo "Finished"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to