This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5a57ecc [Doris On ES]fix bug of query failed in doc_value_mode when
fields have none value (#3513)
5a57ecc is described below
commit 5a57ecca15176fce28e3163321de5c75162050c9
Author: 令狐少侠 <[email protected]>
AuthorDate: Mon May 11 15:34:12 2020 +0800
[Doris On ES]fix bug of query failed in doc_value_mode when fields have
none value (#3513)
#3479
Here I try to explain the cause of the problem and how to fix it.
**The Cause of The problem**
Take the case in issue(#3479 ) as an example:
The general results are as follows:
```
GET table/_doc/_search
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
{
"took": 6,
"timed_out": false,
"_shards": {
……
},
"hits": {
"total": 3,
"max_score": null,
"hits": [
{
"_index": "table",
"_score": null,
"sort": [
0
]
},
{
"_index": "table",
"_score": null,
"fields": {
"k1": [
"kkk1"
]
},
"sort": [
0
]
},
{
"_index": "table",
"_score": null,
"sort": [
0
]
}
]
}
}
```
But in Doris on ES,Be fetched data parallelly on all shards, and use
`filter_path` to reduce the network cost. The process will be as follows:
```
GET
table/_doc/_search?preference=_shards:1&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
{
"hits": {
"total": 0
}
}
GET
table/_doc/_search?preference=_shards:2&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
{
"hits": {
"total": 1
}
}
GET
table/_doc/_search?preference=_shards:3&filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields
{"query":{"match_all":{}},"stored_fields":"_none_","docvalue_fields":["k1"],"sort":["_doc"],"size":100}
{
"hits": {
"total": 1,
"hits": [
{
"fields": {
"k1": [
"kkk1"
]
}
}
]
}
}
```
*Scan-Worker On BE which processed result of shard2 will failed.*
**The reasons are as follows:**
1. "filter_path" causes the hits.hits object not exist.
2. In the current implementation, if there are some data rows(total > 0),
the hits.hits. object must be an array
**How To Fix it**
Two Method:
1. modify "filter_path" to contain the hits.
Pros: Fixed Code is very simple
Cons: More network cost
2. Deal with the case where fields are missing in a batch.
Pros: No loss of performance
Cons: Code is more complex
Performance first, I use Method2.
**Design**
1. Add a variable "_doc_value_mode" into Class "EsScrollParser" to
=indicate whether the data processed by this parser is doc_value_mode or not.
2. "_doc_value_mode" is passed from ESScollReader <- ESScanner <-
ScrollQueryBuilder::build() that determines whether DSL is enable doc_value_mode
3. When hits.hits of response from ES is empty and total > 0. We know there
are data lines, but the corresponding fields do not exist. EsScrollParser will
use "_doc_value_mode" and _total to construct _total lines which fields are
assigned with 'NULL'
---
be/src/exec/es/es_scan_reader.cpp | 7 +++++--
be/src/exec/es/es_scan_reader.h | 4 +++-
be/src/exec/es/es_scroll_parser.cpp | 40 ++++++++++++++++++++++++++++++++++--
be/src/exec/es/es_scroll_parser.h | 8 +++++++-
be/src/exec/es/es_scroll_query.cpp | 6 +++++-
be/src/exec/es/es_scroll_query.h | 3 ++-
be/src/exec/es_http_scan_node.cpp | 6 ++++--
be/src/exec/es_http_scanner.cpp | 6 ++++--
be/src/exec/es_http_scanner.h | 5 ++++-
be/test/exec/es_scan_reader_test.cpp | 5 +++--
10 files changed, 75 insertions(+), 15 deletions(-)
diff --git a/be/src/exec/es/es_scan_reader.cpp
b/be/src/exec/es/es_scan_reader.cpp
index 33a7a3b..4d66738 100644
--- a/be/src/exec/es/es_scan_reader.cpp
+++ b/be/src/exec/es/es_scan_reader.cpp
@@ -35,7 +35,10 @@ const std::string REQUEST_SEPARATOR = "/";
const std::string REQUEST_SEARCH_FILTER_PATH =
"filter_path=hits.hits._source,hits.total,_id,hits.hits._source.fields,hits.hits.fields";
-ESScanReader::ESScanReader(const std::string& target, const
std::map<std::string, std::string>& props) :
_scroll_keep_alive(config::es_scroll_keepalive),
_http_timeout_ms(config::es_http_timeout_ms) {
+ESScanReader::ESScanReader(const std::string& target, const
std::map<std::string, std::string>& props, bool doc_value_mode) :
+ _scroll_keep_alive(config::es_scroll_keepalive),
+ _http_timeout_ms(config::es_http_timeout_ms),
+ _doc_value_mode(doc_value_mode) {
_target = target;
_index = props.at(KEY_INDEX);
_type = props.at(KEY_TYPE);
@@ -142,7 +145,7 @@ Status ESScanReader::get_next(bool* scan_eos,
std::unique_ptr<ScrollParser>& scr
}
}
- scroll_parser.reset(new ScrollParser());
+ scroll_parser.reset(new ScrollParser(_doc_value_mode));
VLOG(1) << "get_next request ES, returned response: " << response;
Status status = scroll_parser->parse(response, _exactly_once);
if (!status.ok()){
diff --git a/be/src/exec/es/es_scan_reader.h b/be/src/exec/es/es_scan_reader.h
index 60137c1..3bd64cf 100644
--- a/be/src/exec/es/es_scan_reader.h
+++ b/be/src/exec/es/es_scan_reader.h
@@ -40,7 +40,7 @@ public:
static constexpr const char* KEY_QUERY = "query";
static constexpr const char* KEY_BATCH_SIZE = "batch_size";
static constexpr const char* KEY_TERMINATE_AFTER = "limit";
- ESScanReader(const std::string& target, const std::map<std::string,
std::string>& props);
+ ESScanReader(const std::string& target, const std::map<std::string,
std::string>& props, bool doc_value_mode);
~ESScanReader();
// launch the first scroll request, this method will cache the first
scroll response, and return the this cached response when invoke get_next
@@ -94,6 +94,8 @@ private:
int _http_timeout_ms;
bool _exactly_once;
+
+ bool _doc_value_mode;
};
}
diff --git a/be/src/exec/es/es_scroll_parser.cpp
b/be/src/exec/es/es_scroll_parser.cpp
index fbbcfb7..1fa9a03 100644
--- a/be/src/exec/es/es_scroll_parser.cpp
+++ b/be/src/exec/es/es_scroll_parser.cpp
@@ -198,11 +198,12 @@ static Status get_float_value(const rapidjson::Value
&col, PrimitiveType type, v
return Status::OK();
}
-ScrollParser::ScrollParser() :
+ScrollParser::ScrollParser(bool doc_value_mode) :
_scroll_id(""),
_total(0),
_size(0),
- _line_index(0) {
+ _line_index(0),
+ _doc_value_mode(doc_value_mode) {
}
ScrollParser::~ScrollParser() {
@@ -247,6 +248,16 @@ Status ScrollParser::parse(const std::string&
scroll_result, bool exactly_once)
}
VLOG(1) << "es_scan_reader parse scroll result: " << scroll_result;
+ if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) {
+ // this is caused by query some columns which are not exit, e.g.
+ // A Index has fields: k1,k2,k3. and we put some rows into this Index
(some fields dose NOT contain any data)
+ // e.g.
+ // put index/_doc/1 {"k2":"123"}
+ // put index/_doc/2 {"k3":"123}
+ // then we use sql `select k1 from table`
+ // what ES return is like this: {hits: {total:2}
+ return Status::OK();
+ }
const rapidjson::Value &inner_hits_node =
outer_hits_node[FIELD_INNER_HITS];
if (!inner_hits_node.IsArray()) {
LOG(WARNING) << "exception maybe happend on es cluster, reponse:" <<
scroll_result;
@@ -275,7 +286,32 @@ int ScrollParser::get_total() {
Status ScrollParser::fill_tuple(const TupleDescriptor* tuple_desc,
Tuple* tuple, MemPool* tuple_pool, bool* line_eof, const
std::map<std::string, std::string>& docvalue_context) {
*line_eof = true;
+
if (_size <= 0 || _line_index >= _size) {
+ // _source is fetched from ES
+ if (!_doc_value_mode) {
+ return Status::OK();
+ }
+
+ // _fields(doc_value) is fetched from ES
+ if (_total <= 0 || _line_index >= _total) {
+ return Status::OK();
+ }
+
+
+ // here is operations for `enable_doc_value_scan`.
+ // This indicates that the fields does not exist(e.g. never assign
values to these fields), but other fields have values.
+ // so, number of rows is >= 0, we need fill `NULL` to these fields
that does not exist.
+ _line_index++;
+ tuple->init(tuple_desc->byte_size());
+ for (int i = 0; i < tuple_desc->slots().size(); ++i) {
+ const SlotDescriptor* slot_desc = tuple_desc->slots()[i];
+ if (slot_desc->is_materialized()) {
+ tuple->set_null(slot_desc->null_indicator_offset());
+ }
+ }
+
+ *line_eof = false;
return Status::OK();
}
diff --git a/be/src/exec/es/es_scroll_parser.h
b/be/src/exec/es/es_scroll_parser.h
index 85ac92b..b97e6bc 100644
--- a/be/src/exec/es/es_scroll_parser.h
+++ b/be/src/exec/es/es_scroll_parser.h
@@ -30,7 +30,7 @@ class Status;
class ScrollParser {
public:
- ScrollParser();
+ ScrollParser(bool doc_value_mode);
~ScrollParser();
Status parse(const std::string& scroll_result, bool exactly_once = false);
@@ -50,5 +50,11 @@ private:
rapidjson::Document _document_node;
rapidjson::Value _inner_hits_node;
+
+ // todo(milimin): ScrollParser should be divided into two classes:
SourceParser and DocValueParser,
+ // including remove some variables in the current implementation, e.g.
pure_doc_value.
+ // All above will be done in the DOE refactoring projects.
+ // Current bug fixes minimize the scope of changes to avoid introducing
other new bugs.
+ bool _doc_value_mode;
};
}
diff --git a/be/src/exec/es/es_scroll_query.cpp
b/be/src/exec/es/es_scroll_query.cpp
index e9b88ee..0c4f581 100644
--- a/be/src/exec/es/es_scroll_query.cpp
+++ b/be/src/exec/es/es_scroll_query.cpp
@@ -63,7 +63,8 @@ std::string
ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scr
std::string ESScrollQueryBuilder::build(const std::map<std::string,
std::string>& properties,
const std::vector<std::string>& fields,
- std::vector<EsPredicate*>& predicates, const
std::map<std::string, std::string>& docvalue_context) {
+ std::vector<EsPredicate*>& predicates, const
std::map<std::string, std::string>& docvalue_context,
+ bool* doc_value_mode) {
rapidjson::Document es_query_dsl;
rapidjson::Document::AllocatorType &allocator =
es_query_dsl.GetAllocator();
es_query_dsl.SetObject();
@@ -86,6 +87,9 @@ std::string ESScrollQueryBuilder::build(const
std::map<std::string, std::string>
}
}
}
+
+ *doc_value_mode = pure_docvalue;
+
rapidjson::Value source_node(rapidjson::kArrayType);
if (pure_docvalue) {
for (auto& select_field : fields) {
diff --git a/be/src/exec/es/es_scroll_query.h b/be/src/exec/es/es_scroll_query.h
index 959ec45..ed5e7e0 100644
--- a/be/src/exec/es/es_scroll_query.h
+++ b/be/src/exec/es/es_scroll_query.h
@@ -35,6 +35,7 @@ public:
// @note: predicates should processed before pass it to this method,
// tie breaker for predicate wheather can push down es can reference the
push-down filters
static std::string build(const std::map<std::string, std::string>&
properties,
- const std::vector<std::string>& fields,
std::vector<EsPredicate*>& predicates, const std::map<std::string,
std::string>& docvalue_context);
+ const std::vector<std::string>& fields,
std::vector<EsPredicate*>& predicates, const std::map<std::string,
std::string>& docvalue_context,
+ bool* doc_value_mode);
};
}
diff --git a/be/src/exec/es_http_scan_node.cpp
b/be/src/exec/es_http_scan_node.cpp
index cb5eeb2..fa7b895 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -443,14 +443,16 @@ void EsHttpScanNode::scanner_worker(int start_idx, int
length, std::promise<Stat
if (limit() != -1 && limit() <= _runtime_state->batch_size()) {
properties[ESScanReader::KEY_TERMINATE_AFTER] =
std::to_string(limit());
}
+
+ bool doc_value_mode = false;
properties[ESScanReader::KEY_QUERY]
- = ESScrollQueryBuilder::build(properties, _column_names, _predicates,
_docvalue_context);
+ = ESScrollQueryBuilder::build(properties, _column_names, _predicates,
_docvalue_context, &doc_value_mode);
// start scanner to scan
std::unique_ptr<EsHttpScanner> scanner(new EsHttpScanner(
_runtime_state, runtime_profile(), _tuple_id,
- properties, scanner_expr_ctxs, &counter));
+ properties, scanner_expr_ctxs, &counter, doc_value_mode));
status = scanner_scan(std::move(scanner), scanner_expr_ctxs, &counter);
if (!status.ok()) {
LOG(WARNING) << "Scanner[" << start_idx << "] process failed. status="
diff --git a/be/src/exec/es_http_scanner.cpp b/be/src/exec/es_http_scanner.cpp
index 4eecaa0..f044080 100644
--- a/be/src/exec/es_http_scanner.cpp
+++ b/be/src/exec/es_http_scanner.cpp
@@ -36,7 +36,8 @@ EsHttpScanner::EsHttpScanner(
TupleId tuple_id,
const std::map<std::string, std::string>& properties,
const std::vector<ExprContext*>& conjunct_ctxs,
- EsScanCounter* counter) :
+ EsScanCounter* counter,
+ bool doc_value_mode) :
_state(state),
_profile(profile),
_tuple_id(tuple_id),
@@ -56,6 +57,7 @@ EsHttpScanner::EsHttpScanner(
_counter(counter),
_es_reader(nullptr),
_es_scroll_parser(nullptr),
+ _doc_value_mode(doc_value_mode),
_rows_read_counter(nullptr),
_read_timer(nullptr),
_materialize_timer(nullptr) {
@@ -74,7 +76,7 @@ Status EsHttpScanner::open() {
}
const std::string& host = _properties.at(ESScanReader::KEY_HOST_PORT);
- _es_reader.reset(new ESScanReader(host, _properties));
+ _es_reader.reset(new ESScanReader(host, _properties, _doc_value_mode));
if (_es_reader == nullptr) {
return Status::InternalError("Es reader construct failed.");
}
diff --git a/be/src/exec/es_http_scanner.h b/be/src/exec/es_http_scanner.h
index 23ebe7c..f94eded 100644
--- a/be/src/exec/es_http_scanner.h
+++ b/be/src/exec/es_http_scanner.h
@@ -62,7 +62,8 @@ public:
TupleId tuple_id,
const std::map<std::string, std::string>& properties,
const std::vector<ExprContext*>& conjunct_ctxs,
- EsScanCounter* counter);
+ EsScanCounter* counter,
+ bool doc_value_mode);
~EsHttpScanner();
Status open();
@@ -94,6 +95,8 @@ private:
std::unique_ptr<ESScanReader> _es_reader;
std::unique_ptr<ScrollParser> _es_scroll_parser;
+ bool _doc_value_mode;
+
// Profile
RuntimeProfile::Counter* _rows_read_counter;
RuntimeProfile::Counter* _read_timer;
diff --git a/be/test/exec/es_scan_reader_test.cpp
b/be/test/exec/es_scan_reader_test.cpp
index cba7735..28976b2 100644
--- a/be/test/exec/es_scan_reader_test.cpp
+++ b/be/test/exec/es_scan_reader_test.cpp
@@ -226,8 +226,9 @@ TEST_F(MockESServerTest, workflow) {
props[ESScanReader::KEY_BATCH_SIZE] = "1";
std::vector<EsPredicate*> predicates;
std::map<std::string, std::string> docvalue_context;
- props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props,
fields, predicates, docvalue_context);
- ESScanReader reader(target, props);
+ bool doc_value_mode = false;
+ props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props,
fields, predicates, docvalue_context, &doc_value_mode);
+ ESScanReader reader(target, props, doc_value_mode);
auto st = reader.open();
ASSERT_TRUE(st.ok());
bool eos = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]