kangpinghuang commented on a change in pull request #1211: Change
FileDownloader to http client
URL: https://github.com/apache/incubator-doris/pull/1211#discussion_r288370414
##########
File path: be/src/olap/task/engine_batch_load_task.cpp
##########
@@ -218,96 +172,94 @@ void EngineBatchLoadTask::_get_file_name_from_path(const
string& file_path, stri
AgentStatus EngineBatchLoadTask::_process() {
AgentStatus status = DORIS_SUCCESS;
-
- if (!_is_init) {
- LOG(WARNING) << "has not init yet. tablet_id: "
- << _push_req.tablet_id;
- return DORIS_ERROR;
+ if (!_is_init) {
+ LOG(WARNING) << "has not init yet. tablet_id: "
+ << _push_req.tablet_id;
+ return DORIS_ERROR;
}
-
// Remote file not empty, need to download
if (_push_req.__isset.http_file_path) {
- // Get file length
+ // Get file length and timeout
uint64_t file_size = 0;
uint64_t estimate_time_out = DEFAULT_DOWNLOAD_TIMEOUT;
if (_push_req.__isset.http_file_size) {
file_size = _push_req.http_file_size;
- estimate_time_out =
- file_size / config::download_low_speed_limit_kbps / 1024;
+ estimate_time_out = file_size /
config::download_low_speed_limit_kbps / 1024;
}
if (estimate_time_out < config::download_low_speed_time) {
estimate_time_out = config::download_low_speed_time;
}
-
- // Download file from hdfs
- for (uint32_t i = 0; i < MAX_RETRY; ++i) {
+ bool is_timeout = false;
+ auto download_cb = [this, estimate_time_out, file_size, &is_timeout]
(HttpClient* client) {
// Check timeout and set timeout
- time_t now = time(nullptr);
-
- if (_push_req.timeout > 0) {
+ time_t now = time(NULL);
+ if (_push_req.timeout > 0 && _push_req.timeout < now) {
+ // return status to break this callback
VLOG(3) << "check time out. time_out:" << _push_req.timeout
- << ", now:" << now;
- if (_push_req.timeout < now) {
- LOG(WARNING) << "push time out";
- status = DORIS_PUSH_TIME_OUT;
- break;
- }
+ << ", now:" << now;
+ is_timeout = true;
+ return Status::OK;
}
- _downloader_param.curl_opt_timeout = estimate_time_out;
+ RETURN_IF_ERROR(client->init(_remote_file_path));
+ // sent timeout
uint64_t timeout = _push_req.timeout > 0 ? _push_req.timeout - now
: 0;
if (timeout > 0 && timeout < estimate_time_out) {
- _downloader_param.curl_opt_timeout = timeout;
+ client->set_timeout_ms(timeout * 1000);
+ } else {
+ client->set_timeout_ms(estimate_time_out * 1000);
}
- VLOG(3) << "estimate_time_out: " << estimate_time_out
- << ", download_timeout: " << estimate_time_out
- << ", curl_opt_timeout: " <<
_downloader_param.curl_opt_timeout
- << ", download file, retry time:" << i;
-#ifndef BE_TEST
- _file_downloader = new FileDownloader(_downloader_param);
- _download_status = _download_file();
- if (_file_downloader != nullptr) {
- delete _file_downloader;
- _file_downloader = nullptr;
- }
-#endif
+ // download remote file
+ RETURN_IF_ERROR(client->download(_local_file_path));
- status = _download_status;
- if (_push_req.__isset.http_file_size && status == DORIS_SUCCESS) {
+ // check file size
+ if (_push_req.__isset.http_file_size) {
// Check file size
- boost::filesystem::path
local_file_path(_downloader_param.local_file_path);
- uint64_t local_file_size =
boost::filesystem::file_size(local_file_path);
- VLOG(3) << "file_size: " << file_size
- << ", local_file_size: " << local_file_size;
-
+ uint64_t local_file_size =
boost::filesystem::file_size(_local_file_path);
if (file_size != local_file_size) {
- OLAP_LOG_WARNING(
- "download_file size error. file_size: %d,
local_file_size: %d",
- file_size, local_file_size);
- status = DORIS_FILE_DOWNLOAD_FAILED;
+ LOG(WARNING) << "download_file size error. file_size=" <<
file_size
+ << ", local_file_size=" << local_file_size;
+ return Status("downloaded file's size isn't right");
}
}
-
- if (status == DORIS_SUCCESS) {
- _push_req.http_file_path = _downloader_param.local_file_path;
- break;
+ // NOTE: change http_file_path is not good design
+ _push_req.http_file_path = _local_file_path;
+ return Status::OK;
+ };
+
+ MonotonicStopWatch stopwatch;
+ stopwatch.start();
+ auto st = HttpClient::execute_with_retry(MAX_RETRY, 1, download_cb);
+ auto cost = stopwatch.elapsed_time();
+ if (cost <= 0) {
+ cost = 1;
+ }
+ if (st.ok() && !is_timeout) {
+ double rate = -1.0;
+ if (_push_req.__isset.http_file_size) {
+ rate = (double) _push_req.http_file_size / (cost / 1000 / 1000
/ 1000) / 1024;
}
-#ifndef BE_TEST
- sleep(config::sleep_one_second);
-#endif
+ LOG(INFO) << "down load file success. local_file=" <<
_local_file_path
+ << ", remote_file=" << _remote_file_path
+ << ", tablet_id" << _push_req.tablet_id
+ << ", cost=" << cost / 1000 << "us, file_size" <<
_push_req.http_file_size
+ << ", download rage:" << rate << "KB/s";
+ } else {
+ LOG(WARNING) << "down load file failed. remote_file=" <<
_remote_file_path
+ << ", tablet=" << _push_req.tablet_id
+ << ", cost=" << cost / 1000
+ << "us, errmsg=" << st.get_error_msg() << ", is_timeout=" <<
is_timeout;
+ status = DORIS_ERROR;
}
}
if (status == DORIS_SUCCESS) {
// Load delta file
- int64_t duration_ns = 0;
- OLAPStatus push_status = OLAP_SUCCESS;
- {
- SCOPED_RAW_TIMER(&duration_ns);
- push_status = _push(_push_req, _tablet_infos);
- }
- LOG(INFO) << "Push finish, cost time: " << duration_ns/1000 << " us";
+ time_t push_begin = time(NULL);
+ OLAPStatus push_status = _push(_push_req, _tablet_infos);
+ time_t push_finish = time(NULL);
+ LOG(INFO) << "Push finish, cost time: " << (push_finish - push_begin);
Review comment:
add time unit
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]