This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a57ecc   [Doris On ES]fix bug of query failed in doc_value_mode when 
fields have none value (#3513)
5a57ecc is described below

commit 5a57ecca15176fce28e3163321de5c75162050c9
Author: 令狐少侠 <[email protected]>
AuthorDate: Mon May 11 15:34:12 2020 +0800

     [Doris On ES]fix bug of query failed in doc_value_mode when fields have 
none value (#3513)
    
    #3479
    
    Here I try to explain the cause of the problem and how to fix it.
    
    **The Cause of The problem**
    Take the case in issue(#3479 ) as an example:
    The general results are as follows:
    ```
    GET table/_doc/_search
    
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
    
    {
      "took": 6,
      "timed_out": false,
      "_shards": {
        ……
      },
      "hits": {
        "total": 3,
        "max_score": null,
        "hits": [
          {
            "_index": "table",
            "_score": null,
            "sort": [
              0
            ]
          },
          {
            "_index": "table",
            "_score": null,
            "fields": {
              "k1": [
                "kkk1"
              ]
            },
            "sort": [
              0
            ]
          },
          {
            "_index": "table",
            "_score": null,
            "sort": [
              0
            ]
          }
        ]
      }
    }
    ```
    
    But in Doris on ES,Be fetched data parallelly on all shards, and use 
`filter_path` to reduce the network cost. The process will be as follows:
    ```
    GET 
table/_doc/_search?preference=_shards:1&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
    
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
    
    {
      "hits": {
        "total": 0
      }
    }
    
    GET 
table/_doc/_search?preference=_shards:2&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
    
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
    {
      "hits": {
        "total": 1
      }
    }
    
    GET 
table/_doc/_search?preference=_shards:3&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
    
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
    {
      "hits": {
        "total": 1,
        "hits": [
          {
            "fields": {
              "k1": [
                "kkk1"
              ]
            }
          }
        ]
      }
    }
    ```
    *Scan-Worker On BE which processed result of shard2  will failed.*
    
    **The reasons are as follows:**
    1. "filter_path" causes the hits.hits object not exist.
    2. In the current implementation, if there are some data rows(total > 0), 
the hits.hits. object must be an array
    
    **How To Fix it**
    
    Two Method:
    1. modify "filter_path" to contain the hits.
    Pros: Fixed Code is very simple
    Cons: More network cost
    2. Deal with the case where fields are missing in a batch.
    Pros: No loss of performance
    Cons: Code is more complex
    
    Performance first, I use Method2.
    
    **Design**
    1. Add a variable "_doc_value_mode" into Class "EsScrollParser" to 
=indicate whether the data processed by this parser is doc_value_mode or not.
    2. "_doc_value_mode" is passed from ESScollReader <- ESScanner <- 
ScrollQueryBuilder::build() that determines whether DSL is enable doc_value_mode
    3. When hits.hits of response from ES is empty and total > 0. We know there 
are data lines, but the corresponding fields do not exist. EsScrollParser will 
use "_doc_value_mode"  and _total to construct _total lines which fields are 
assigned with 'NULL'
---
 be/src/exec/es/es_scan_reader.cpp    |  7 +++++--
 be/src/exec/es/es_scan_reader.h      |  4 +++-
 be/src/exec/es/es_scroll_parser.cpp  | 40 ++++++++++++++++++++++++++++++++++--
 be/src/exec/es/es_scroll_parser.h    |  8 +++++++-
 be/src/exec/es/es_scroll_query.cpp   |  6 +++++-
 be/src/exec/es/es_scroll_query.h     |  3 ++-
 be/src/exec/es_http_scan_node.cpp    |  6 ++++--
 be/src/exec/es_http_scanner.cpp      |  6 ++++--
 be/src/exec/es_http_scanner.h        |  5 ++++-
 be/test/exec/es_scan_reader_test.cpp |  5 +++--
 10 files changed, 75 insertions(+), 15 deletions(-)

diff --git a/be/src/exec/es/es_scan_reader.cpp 
b/be/src/exec/es/es_scan_reader.cpp
index 33a7a3b..4d66738 100644
--- a/be/src/exec/es/es_scan_reader.cpp
+++ b/be/src/exec/es/es_scan_reader.cpp
@@ -35,7 +35,10 @@ const std::string REQUEST_SEPARATOR = "/";
 
 const std::string REQUEST_SEARCH_FILTER_PATH = 
"filter_path=hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields";
 
-ESScanReader::ESScanReader(const std::string& target, const 
std::map<std::string, std::string>& props) : 
_scroll_keep_alive(config::es_scroll_keepalive), 
_http_timeout_ms(config::es_http_timeout_ms) {
+ESScanReader::ESScanReader(const std::string& target, const 
std::map<std::string, std::string>& props, bool doc_value_mode) : 
+        _scroll_keep_alive(config::es_scroll_keepalive), 
+        _http_timeout_ms(config::es_http_timeout_ms), 
+        _doc_value_mode(doc_value_mode) {
     _target = target;
     _index = props.at(KEY_INDEX);
     _type = props.at(KEY_TYPE);
@@ -142,7 +145,7 @@ Status ESScanReader::get_next(bool* scan_eos, 
std::unique_ptr<ScrollParser>& scr
         }
     }
 
-    scroll_parser.reset(new ScrollParser());
+    scroll_parser.reset(new ScrollParser(_doc_value_mode));
     VLOG(1) << "get_next request ES, returned response: " << response;
     Status status = scroll_parser->parse(response, _exactly_once);
     if (!status.ok()){
diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h
index 60137c1..3bd64cf 100644
--- a/be/src/exec/es/es_scan_reader.h
+++ b/be/src/exec/es/es_scan_reader.h
@@ -40,7 +40,7 @@ public:
     static constexpr const char* KEY_QUERY = "query";
     static constexpr const char* KEY_BATCH_SIZE = "batch_size";
     static constexpr const char* KEY_TERMINATE_AFTER = "limit";
-    ESScanReader(const std::string& target, const std::map<std::string, 
std::string>& props);
+    ESScanReader(const std::string& target, const std::map<std::string, 
std::string>& props, bool doc_value_mode);
     ~ESScanReader();
 
     // launch the first scroll request, this method will cache the first 
scroll response, and return the this cached response when invoke get_next
@@ -94,6 +94,8 @@ private:
     int _http_timeout_ms;
 
     bool _exactly_once;
+    
+    bool _doc_value_mode;
 };
 }
 
diff --git a/be/src/exec/es/es_scroll_parser.cpp 
b/be/src/exec/es/es_scroll_parser.cpp
index fbbcfb7..1fa9a03 100644
--- a/be/src/exec/es/es_scroll_parser.cpp
+++ b/be/src/exec/es/es_scroll_parser.cpp
@@ -198,11 +198,12 @@ static Status get_float_value(const rapidjson::Value 
&col, PrimitiveType type, v
     return Status::OK();
 }
 
-ScrollParser::ScrollParser() :
+ScrollParser::ScrollParser(bool doc_value_mode) :
     _scroll_id(""),
     _total(0),
     _size(0),
-    _line_index(0) {
+    _line_index(0),
+    _doc_value_mode(doc_value_mode) {
 }
 
 ScrollParser::~ScrollParser() {
@@ -247,6 +248,16 @@ Status ScrollParser::parse(const std::string& 
scroll_result, bool exactly_once)
     }
 
     VLOG(1) << "es_scan_reader parse scroll result: " << scroll_result;
+    if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) {
+        // this is caused by query some columns which are not exit, e.g.
+        // A Index has fields: k1,k2,k3. and we put some rows into this Index 
(some fields dose NOT contain any data)
+        // e.g. 
+        // put index/_doc/1 {"k2":"123"}
+        // put index/_doc/2 {"k3":"123}
+        // then we use sql `select k1 from table`
+        // what ES return is like this: {hits: {total:2}
+        return Status::OK();
+    }
     const rapidjson::Value &inner_hits_node = 
outer_hits_node[FIELD_INNER_HITS];
     if (!inner_hits_node.IsArray()) {
         LOG(WARNING) << "exception maybe happend on es cluster, reponse:" << 
scroll_result;
@@ -275,7 +286,32 @@ int ScrollParser::get_total() {
 Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc, 
             Tuple* tuple, MemPool* tuple_pool, bool* line_eof, const 
std::map<std::string, std::string>& docvalue_context) {
     *line_eof = true;
+
     if (_size <= 0 || _line_index >= _size) {
+        // _source is fetched from ES
+        if (!_doc_value_mode) {
+            return Status::OK();
+        }
+        
+        // _fields(doc_value) is fetched from ES
+        if (_total <= 0 || _line_index >= _total) {
+            return Status::OK();
+        }
+       
+       
+        // here is operations for `enable_doc_value_scan`.
+        // This indicates that the fields does not exist(e.g. never assign 
values to these fields), but other fields have values.
+        // so, number of rows is >= 0, we need fill `NULL` to these fields 
that does not exist.
+        _line_index++;
+        tuple->init(tuple_desc->byte_size());
+        for (int i = 0; i < tuple_desc->slots().size(); ++i) {
+            const SlotDescriptor* slot_desc = tuple_desc->slots()[i];
+            if (slot_desc->is_materialized()) {
+                tuple->set_null(slot_desc->null_indicator_offset());
+            }
+        }
+
+        *line_eof = false;
         return Status::OK();
     }
 
diff --git a/be/src/exec/es/es_scroll_parser.h 
b/be/src/exec/es/es_scroll_parser.h
index 85ac92b..b97e6bc 100644
--- a/be/src/exec/es/es_scroll_parser.h
+++ b/be/src/exec/es/es_scroll_parser.h
@@ -30,7 +30,7 @@ class Status;
 class ScrollParser {
 
 public:
-    ScrollParser();
+    ScrollParser(bool doc_value_mode);
     ~ScrollParser();
 
     Status parse(const std::string& scroll_result, bool exactly_once = false);
@@ -50,5 +50,11 @@ private:
 
     rapidjson::Document _document_node;
     rapidjson::Value _inner_hits_node;
+    
+    // todo(milimin): ScrollParser should be divided into two classes: 
SourceParser and DocValueParser, 
+    // including remove some variables in the current implementation, e.g. 
pure_doc_value. 
+    // All above will be done in the DOE refactoring projects.
+    // Current bug fixes minimize the scope of changes to avoid introducing 
other new bugs.
+    bool _doc_value_mode;
 };
 }
diff --git a/be/src/exec/es/es_scroll_query.cpp 
b/be/src/exec/es/es_scroll_query.cpp
index e9b88ee..0c4f581 100644
--- a/be/src/exec/es/es_scroll_query.cpp
+++ b/be/src/exec/es/es_scroll_query.cpp
@@ -63,7 +63,8 @@ std::string 
ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scr
 
 std::string ESScrollQueryBuilder::build(const std::map<std::string, 
std::string>& properties,
                 const std::vector<std::string>& fields,
-                std::vector<EsPredicate*>& predicates, const 
std::map<std::string, std::string>& docvalue_context) {
+                std::vector<EsPredicate*>& predicates, const 
std::map<std::string, std::string>& docvalue_context,
+                bool* doc_value_mode) {
     rapidjson::Document es_query_dsl;
     rapidjson::Document::AllocatorType &allocator = 
es_query_dsl.GetAllocator();
     es_query_dsl.SetObject();
@@ -86,6 +87,9 @@ std::string ESScrollQueryBuilder::build(const 
std::map<std::string, std::string>
             }
         }
     }
+
+    *doc_value_mode = pure_docvalue;
+
     rapidjson::Value source_node(rapidjson::kArrayType);
     if (pure_docvalue) {
         for (auto& select_field : fields) {
diff --git a/be/src/exec/es/es_scroll_query.h b/be/src/exec/es/es_scroll_query.h
index 959ec45..ed5e7e0 100644
--- a/be/src/exec/es/es_scroll_query.h
+++ b/be/src/exec/es/es_scroll_query.h
@@ -35,6 +35,7 @@ public:
     // @note: predicates should processed before pass it to this method, 
     // tie breaker for predicate wheather can push down es can reference the 
push-down filters
     static std::string build(const std::map<std::string, std::string>& 
properties,
-                const std::vector<std::string>& fields, 
std::vector<EsPredicate*>& predicates, const std::map<std::string, 
std::string>& docvalue_context);
+                const std::vector<std::string>& fields, 
std::vector<EsPredicate*>& predicates, const std::map<std::string, 
std::string>& docvalue_context,
+                bool* doc_value_mode);
 };
 }
diff --git a/be/src/exec/es_http_scan_node.cpp 
b/be/src/exec/es_http_scan_node.cpp
index cb5eeb2..fa7b895 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -443,14 +443,16 @@ void EsHttpScanNode::scanner_worker(int start_idx, int 
length, std::promise<Stat
     if (limit() != -1 && limit() <= _runtime_state->batch_size()) {
         properties[ESScanReader::KEY_TERMINATE_AFTER] = 
std::to_string(limit());
     }
+
+    bool doc_value_mode = false;
     properties[ESScanReader::KEY_QUERY] 
-        = ESScrollQueryBuilder::build(properties, _column_names, _predicates, 
_docvalue_context);
+        = ESScrollQueryBuilder::build(properties, _column_names, _predicates, 
_docvalue_context, &doc_value_mode);
 
 
     // start scanner to scan
     std::unique_ptr<EsHttpScanner> scanner(new EsHttpScanner(
                     _runtime_state, runtime_profile(), _tuple_id,
-                    properties, scanner_expr_ctxs, &counter));
+                    properties, scanner_expr_ctxs, &counter, doc_value_mode));
     status = scanner_scan(std::move(scanner), scanner_expr_ctxs, &counter);
     if (!status.ok()) {
         LOG(WARNING) << "Scanner[" << start_idx << "] process failed. status="
diff --git a/be/src/exec/es_http_scanner.cpp b/be/src/exec/es_http_scanner.cpp
index 4eecaa0..f044080 100644
--- a/be/src/exec/es_http_scanner.cpp
+++ b/be/src/exec/es_http_scanner.cpp
@@ -36,7 +36,8 @@ EsHttpScanner::EsHttpScanner(
             TupleId tuple_id,
             const std::map<std::string, std::string>& properties,
             const std::vector<ExprContext*>& conjunct_ctxs,
-            EsScanCounter* counter) :
+            EsScanCounter* counter,
+            bool doc_value_mode) :
         _state(state),
         _profile(profile),
         _tuple_id(tuple_id),
@@ -56,6 +57,7 @@ EsHttpScanner::EsHttpScanner(
         _counter(counter),
         _es_reader(nullptr),
         _es_scroll_parser(nullptr),
+        _doc_value_mode(doc_value_mode),
         _rows_read_counter(nullptr),
         _read_timer(nullptr),
         _materialize_timer(nullptr) {
@@ -74,7 +76,7 @@ Status EsHttpScanner::open() {
     }
 
     const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT);
-    _es_reader.reset(new ESScanReader(host, _properties));
+    _es_reader.reset(new ESScanReader(host, _properties, _doc_value_mode));
     if (_es_reader == nullptr) {
         return Status::InternalError("Es reader construct failed.");
     }
diff --git a/be/src/exec/es_http_scanner.h b/be/src/exec/es_http_scanner.h
index 23ebe7c..f94eded 100644
--- a/be/src/exec/es_http_scanner.h
+++ b/be/src/exec/es_http_scanner.h
@@ -62,7 +62,8 @@ public:
         TupleId tuple_id,
         const std::map<std::string, std::string>& properties,
         const std::vector<ExprContext*>& conjunct_ctxs,
-        EsScanCounter* counter);
+        EsScanCounter* counter,
+        bool doc_value_mode);
     ~EsHttpScanner();
 
     Status open();
@@ -94,6 +95,8 @@ private:
     std::unique_ptr<ESScanReader> _es_reader;
     std::unique_ptr<ScrollParser> _es_scroll_parser;
 
+    bool _doc_value_mode;
+
     // Profile
     RuntimeProfile::Counter* _rows_read_counter;
     RuntimeProfile::Counter* _read_timer;
diff --git a/be/test/exec/es_scan_reader_test.cpp 
b/be/test/exec/es_scan_reader_test.cpp
index cba7735..28976b2 100644
--- a/be/test/exec/es_scan_reader_test.cpp
+++ b/be/test/exec/es_scan_reader_test.cpp
@@ -226,8 +226,9 @@ TEST_F(MockESServerTest, workflow) {
     props[ESScanReader::KEY_BATCH_SIZE] = "1";
     std::vector<EsPredicate*> predicates;
     std::map<std::string, std::string> docvalue_context;
-    props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, 
fields, predicates, docvalue_context);
-    ESScanReader reader(target, props);
+    bool doc_value_mode = false;
+    props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, 
fields, predicates, docvalue_context, &doc_value_mode);
+    ESScanReader reader(target, props, doc_value_mode);
     auto st = reader.open();
     ASSERT_TRUE(st.ok());
     bool eos = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to