lide-reed commented on a change in pull request #925: Doris on ES by HTTP way URL: https://github.com/apache/incubator-doris/pull/925#discussion_r278386373
########## File path: be/src/exec/es/es_scan_reader.cpp ########## @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/es/es_scan_reader.h" + +#include <map> +#include <string> +#include <sstream> + +#include "common/logging.h" +#include "common/status.h" +#include "exec/es/es_scroll_query.h" + +namespace doris { +const std::string REUQEST_SCROLL_FILTER_PATH = "filter_path=_scroll_id,hits.hits._source,hits.total,_id,hits.hits._source.fields"; +const std::string REQUEST_SCROLL_PATH = "_scroll"; +const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:"; +const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll"; +const std::string REQUEST_SEPARATOR = "/"; +const std::string REQUEST_SCROLL_TIME = "5m"; + +ESScanReader::ESScanReader(const std::string& target, const std::map<std::string, std::string>& props) { + _target = target; + _index = props.at(KEY_INDEX); + _type = props.at(KEY_TYPE); + if (props.find(KEY_USER_NAME) != props.end()) { + _user_name = props.at(KEY_USER_NAME); + } + if (props.find(KEY_PASS_WORD) != props.end()){ + _passwd = props.at(KEY_PASS_WORD); + } + if (props.find(KEY_SHARD) != props.end()) { + _shards = props.at(KEY_SHARD); + } + if (props.find(KEY_QUERY) != props.end()) { + _query = props.at(KEY_QUERY); + } + std::string batch_size_str = props.at(KEY_BATCH_SIZE); + _batch_size = atoi(batch_size_str.c_str()); + _init_scroll_url = _target + REQUEST_SEPARATOR + _index + REQUEST_SEPARATOR + _type + "/_search?scroll=" + REQUEST_SCROLL_TIME + REQUEST_PREFERENCE_PREFIX + _shards + "&" + REUQEST_SCROLL_FILTER_PATH; + _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + REUQEST_SCROLL_FILTER_PATH; + _eos = false; +} + +ESScanReader::~ESScanReader() { +} + +Status ESScanReader::open() { + _is_first = true; + RETURN_IF_ERROR(_network_client.init(_init_scroll_url)); + _network_client.set_basic_auth(_user_name, _passwd); + _network_client.set_content_type("application/json"); + // phase open, we cached the first response for `get_next` phase + Status status = _network_client.execute_post_request(_query, &_cached_response); + if (!status.ok() || _network_client.get_http_status() != 200) { + std::stringstream ss; + ss << "Failed to connect to ES server, errmsg is: " << status.get_error_msg(); + LOG(WARNING) << ss.str(); + return Status(ss.str()); + } + VLOG(1) << "open _cached response: " << _cached_response; + return Status::OK; +} + +Status ESScanReader::get_next(bool* scan_eos, ScrollParser** parser) { + std::string response; + ScrollParser* scroll_parser = nullptr; + // if is first scroll request, should return the cached response + *parser = nullptr; + *scan_eos = true; + if (_eos) { + return Status::OK; + } + + if (_is_first) { + response = _cached_response; + _is_first = false; + } else { + RETURN_IF_ERROR(_network_client.init(_next_scroll_url)); + _network_client.set_basic_auth(_user_name, _passwd); + _network_client.set_content_type("application/json"); + _network_client.set_timeout_ms(5 * 1000); + RETURN_IF_ERROR(_network_client.execute_post_request( + ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, REQUEST_SCROLL_TIME), &response)); + long status = _network_client.get_http_status(); + if (status == 404) { + LOG(WARNING) << "request scroll search failure 404[" + << ", response: " << (response.empty() ? "empty response" : response); + return Status("No search context found for " + _scroll_id); + } + if (status != 200) { + LOG(WARNING) << "request scroll search failure[" + << "http status" << status + << ", response: " << (response.empty() ? "empty response" : response); + if (status == 404) { + return Status("No search context found for " + _scroll_id); + } + return Status("request scroll search failure: " + (response.empty() ? "empty response" : response)); + } + } + + scroll_parser = new ScrollParser(); Review comment: Done ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
