morningman commented on code in PR #22509: URL: https://github.com/apache/doris/pull/22509#discussion_r1285234852
########## be/src/http/action/stream_load_with_sql.cpp: ########## @@ -0,0 +1,544 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/stream_load_with_sql.h" + +#include <cstddef> +#include <deque> +#include <future> +#include <shared_mutex> +#include <sstream> + +// use string iequal +#include <event2/buffer.h> +#include <event2/bufferevent.h> +#include <event2/http.h> +#include <rapidjson/prettywriter.h> +#include <thrift/protocol/TDebugProtocol.h> + +#include "common/config.h" +#include "common/consts.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/utils.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "http/http_channel.h" +#include "http/http_common.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_response.h" +#include "http/utils.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/storage_engine.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "runtime/load_path_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "runtime/stream_load/stream_load_recorder.h" +#include "util/byte_buffer.h" +#include "util/debug_util.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" +#include "util/string_util.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" +#include "util/uid_util.h" + +namespace doris { +using namespace ErrorCode; + +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_requests_total, MetricUnit::REQUESTS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_duration_ms, MetricUnit::MILLISECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing, + MetricUnit::REQUESTS); + +void StreamLoadWithSqlAction::_parse_format(const std::string& format_str, + const std::string& compress_type_str, + TFileFormatType::type* format_type, + TFileCompressType::type* compress_type) { + if (format_str.empty()) { + _parse_format("CSV", compress_type_str, format_type, compress_type); + return; + } + *compress_type = TFileCompressType::PLAIN; + *format_type = TFileFormatType::FORMAT_UNKNOWN; + if (iequal(format_str, "CSV")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_CSV_PLAIN; + } else if (iequal(compress_type_str, "GZ")) { + *format_type = TFileFormatType::FORMAT_CSV_GZ; + *compress_type = TFileCompressType::GZ; + } else if (iequal(compress_type_str, "LZO")) { + *format_type = TFileFormatType::FORMAT_CSV_LZO; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "BZ2")) { + *format_type = TFileFormatType::FORMAT_CSV_BZ2; + *compress_type = TFileCompressType::BZ2; + } else if (iequal(compress_type_str, "LZ4")) { + *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + *compress_type = TFileCompressType::LZ4FRAME; + } else if (iequal(compress_type_str, "LZOP")) { + *format_type = TFileFormatType::FORMAT_CSV_LZOP; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "DEFLATE")) { + *format_type = TFileFormatType::FORMAT_CSV_DEFLATE; + *compress_type = TFileCompressType::DEFLATE; + } + } else if (iequal(format_str, "JSON")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_JSON; + } + } else if (iequal(format_str, "PARQUET")) { + *format_type = TFileFormatType::FORMAT_PARQUET; + } else if (iequal(format_str, "ORC")) { + *format_type = TFileFormatType::FORMAT_ORC; + } +} + +bool StreamLoadWithSqlAction::_is_format_support_streaming(TFileFormatType::type format) { + switch (format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_JSON: + return true; + default: + return false; + } +} + +StreamLoadWithSqlAction::StreamLoadWithSqlAction(ExecEnv* exec_env) : _exec_env(exec_env) { + _stream_load_with_sql_entity = + DorisMetrics::instance()->metric_registry()->register_entity("stream_load_with_sql"); + INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, + streaming_load_with_sql_requests_total); + INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, streaming_load_with_sql_duration_ms); + INT_GAUGE_METRIC_REGISTER(_stream_load_with_sql_entity, + streaming_load_with_sql_current_processing); +} + +StreamLoadWithSqlAction::~StreamLoadWithSqlAction() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_with_sql_entity); +} + +void StreamLoadWithSqlAction::handle(HttpRequest* req) { + std::shared_ptr<StreamLoadContext> ctx = + std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); + if (ctx == nullptr) { + return; + } + + // status already set to fail + if (ctx->status.ok()) { + ctx->status = _handle(req, ctx); + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + } + } + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; + + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + } + + if (!ctx->status.ok()) { + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + return; + } + + // query stream load status + // put request + TStreamLoadWithLoadStatusRequest request; + TStreamLoadWithLoadStatusResult result; + request.__set_loadId(ctx->id.to_thrift()); + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + while (true) { + ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->streamLoadWithLoadStatus(result, request); + }); + // Status stream_load_status(result.status); Review Comment: Remove unused code ########## fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java: ########## @@ -1812,6 +1844,106 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ return result; } + public class ReportStreamLoadWorker implements Runnable { Review Comment: Add comment to explain what is this class for? And is it suitable to make it as a inner class of `FrontendServiceImpl`? It will make `FrontendServiceImpl` too large ########## gensrc/thrift/FrontendService.thrift: ########## @@ -621,6 +625,20 @@ struct TStreamLoadMultiTablePutResult { 3: optional list<PaloInternalService.TPipelineFragmentParams> pipeline_params } +// StreamLoadWith request status +struct TStreamLoadWithLoadStatusRequest { + 1: required Types.TUniqueId loadId Review Comment: Do not use `required`, use `optional`. Same for other fields below. ########## fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java: ########## @@ -187,6 +187,10 @@ public class Coordinator { // Once this is set to true, errors from remote fragments are ignored. private boolean returnedAllResults; + private RuntimeProfile queryProfile; Review Comment: Add comment to explain why need a new query profile here? ########## be/src/common/config.cpp: ########## @@ -484,6 +484,15 @@ DEFINE_Int32(stream_load_record_expire_time_secs, "28800"); // time interval to clean expired stream load records DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800"); +// use memory in stream load default +DEFINE_Int64(stream_load_exec_mem_limit, "214748364"); // 2G +// The buffer size to store stream table function schema info +DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB +// The exec timeout of stream load default +DEFINE_Int32(stream_load_timeout_second, "10"); +// The timeout of BE wait FE report status +DEFINE_Int32(stream_load_report_timeout_second, "3"); Review Comment: What's this for? I think we add too much configs for this feature. The above `stream_load_exec_mem_limit` is also unnecessary ########## be/src/http/action/stream_load_with_sql.cpp: ########## @@ -0,0 +1,544 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/stream_load_with_sql.h" + +#include <cstddef> +#include <deque> +#include <future> +#include <shared_mutex> +#include <sstream> + +// use string iequal +#include <event2/buffer.h> +#include <event2/bufferevent.h> +#include <event2/http.h> +#include <rapidjson/prettywriter.h> +#include <thrift/protocol/TDebugProtocol.h> + +#include "common/config.h" +#include "common/consts.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/utils.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "http/http_channel.h" +#include "http/http_common.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_response.h" +#include "http/utils.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/storage_engine.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "runtime/load_path_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "runtime/stream_load/stream_load_recorder.h" +#include "util/byte_buffer.h" +#include "util/debug_util.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" +#include "util/string_util.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" +#include "util/uid_util.h" + +namespace doris { +using namespace ErrorCode; + +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_requests_total, MetricUnit::REQUESTS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_duration_ms, MetricUnit::MILLISECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing, + MetricUnit::REQUESTS); + +void StreamLoadWithSqlAction::_parse_format(const std::string& format_str, + const std::string& compress_type_str, + TFileFormatType::type* format_type, + TFileCompressType::type* compress_type) { + if (format_str.empty()) { + _parse_format("CSV", compress_type_str, format_type, compress_type); + return; + } + *compress_type = TFileCompressType::PLAIN; + *format_type = TFileFormatType::FORMAT_UNKNOWN; + if (iequal(format_str, "CSV")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_CSV_PLAIN; + } else if (iequal(compress_type_str, "GZ")) { + *format_type = TFileFormatType::FORMAT_CSV_GZ; + *compress_type = TFileCompressType::GZ; + } else if (iequal(compress_type_str, "LZO")) { + *format_type = TFileFormatType::FORMAT_CSV_LZO; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "BZ2")) { + *format_type = TFileFormatType::FORMAT_CSV_BZ2; + *compress_type = TFileCompressType::BZ2; + } else if (iequal(compress_type_str, "LZ4")) { + *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + *compress_type = TFileCompressType::LZ4FRAME; + } else if (iequal(compress_type_str, "LZOP")) { + *format_type = TFileFormatType::FORMAT_CSV_LZOP; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "DEFLATE")) { + *format_type = TFileFormatType::FORMAT_CSV_DEFLATE; + *compress_type = TFileCompressType::DEFLATE; + } + } else if (iequal(format_str, "JSON")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_JSON; + } + } else if (iequal(format_str, "PARQUET")) { + *format_type = TFileFormatType::FORMAT_PARQUET; + } else if (iequal(format_str, "ORC")) { + *format_type = TFileFormatType::FORMAT_ORC; + } +} + +bool StreamLoadWithSqlAction::_is_format_support_streaming(TFileFormatType::type format) { + switch (format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_JSON: + return true; + default: + return false; + } +} + +StreamLoadWithSqlAction::StreamLoadWithSqlAction(ExecEnv* exec_env) : _exec_env(exec_env) { + _stream_load_with_sql_entity = + DorisMetrics::instance()->metric_registry()->register_entity("stream_load_with_sql"); + INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, + streaming_load_with_sql_requests_total); + INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, streaming_load_with_sql_duration_ms); + INT_GAUGE_METRIC_REGISTER(_stream_load_with_sql_entity, + streaming_load_with_sql_current_processing); +} + +StreamLoadWithSqlAction::~StreamLoadWithSqlAction() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_with_sql_entity); +} + +void StreamLoadWithSqlAction::handle(HttpRequest* req) { + std::shared_ptr<StreamLoadContext> ctx = + std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); + if (ctx == nullptr) { + return; + } + + // status already set to fail + if (ctx->status.ok()) { + ctx->status = _handle(req, ctx); + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + } + } + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; + + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + } + + if (!ctx->status.ok()) { + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + return; + } + + // query stream load status + // put request + TStreamLoadWithLoadStatusRequest request; + TStreamLoadWithLoadStatusResult result; + request.__set_loadId(ctx->id.to_thrift()); + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + while (true) { + ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->streamLoadWithLoadStatus(result, request); + }); + // Status stream_load_status(result.status); + // if (stream_load_status.ok()) { + // ctx->txn_id = result.txn_id; + // ctx->number_total_rows = result.total_rows; + // ctx->number_loaded_rows = result.loaded_rows; + // ctx->number_filtered_rows = result.filtered_rows; + // ctx->number_unselected_rows = result.unselected_rows; + // break; + // } + } + + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } + // update statistics + streaming_load_with_sql_requests_total->increment(1); + streaming_load_with_sql_duration_ms->increment(ctx->load_cost_millis); + streaming_load_with_sql_current_processing->increment(-1); +} + +Status StreamLoadWithSqlAction::_handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::InternalError("receive body don't equal with body bytes"); + } + if (!ctx->use_streaming) { Review Comment: I think we can simply not support non-stream format such as parquet and orc. To make the code simple ########## be/src/common/config.cpp: ########## @@ -484,6 +484,15 @@ DEFINE_Int32(stream_load_record_expire_time_secs, "28800"); // time interval to clean expired stream load records DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800"); +// use memory in stream load default +DEFINE_Int64(stream_load_exec_mem_limit, "214748364"); // 2G +// The buffer size to store stream table function schema info +DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB +// The exec timeout of stream load default +DEFINE_Int32(stream_load_timeout_second, "10"); Review Comment: Why do need to set the timeout of stream load in BE config? ########## fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java: ########## @@ -252,6 +256,7 @@ public class Coordinator { public List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>(); // Runtime filter ID to the builder instance number public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap(); + ConnectContext context; Review Comment: ```suggestion private ConnectContext context; ``` ########## be/src/http/action/stream_load_with_sql.cpp: ########## @@ -0,0 +1,544 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "http/action/stream_load_with_sql.h" + +#include <cstddef> +#include <deque> +#include <future> +#include <shared_mutex> +#include <sstream> + +// use string iequal +#include <event2/buffer.h> +#include <event2/bufferevent.h> +#include <event2/http.h> +#include <rapidjson/prettywriter.h> +#include <thrift/protocol/TDebugProtocol.h> + +#include "common/config.h" +#include "common/consts.h" +#include "common/logging.h" +#include "common/status.h" +#include "common/utils.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/FrontendService_types.h" +#include "gen_cpp/HeartbeatService_types.h" +#include "http/http_channel.h" +#include "http/http_common.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_response.h" +#include "http/utils.h" +#include "io/fs/stream_load_pipe.h" +#include "olap/storage_engine.h" +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "runtime/load_path_mgr.h" +#include "runtime/plan_fragment_executor.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" +#include "runtime/stream_load/stream_load_executor.h" +#include "runtime/stream_load/stream_load_recorder.h" +#include "util/byte_buffer.h" +#include "util/debug_util.h" +#include "util/doris_metrics.h" +#include "util/metrics.h" +#include "util/string_util.h" +#include "util/thrift_rpc_helper.h" +#include "util/time.h" +#include "util/uid_util.h" + +namespace doris { +using namespace ErrorCode; + +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_requests_total, MetricUnit::REQUESTS); +DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_duration_ms, MetricUnit::MILLISECONDS); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing, + MetricUnit::REQUESTS); + +void StreamLoadWithSqlAction::_parse_format(const std::string& format_str, + const std::string& compress_type_str, + TFileFormatType::type* format_type, + TFileCompressType::type* compress_type) { + if (format_str.empty()) { + _parse_format("CSV", compress_type_str, format_type, compress_type); + return; + } + *compress_type = TFileCompressType::PLAIN; + *format_type = TFileFormatType::FORMAT_UNKNOWN; + if (iequal(format_str, "CSV")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_CSV_PLAIN; + } else if (iequal(compress_type_str, "GZ")) { + *format_type = TFileFormatType::FORMAT_CSV_GZ; + *compress_type = TFileCompressType::GZ; + } else if (iequal(compress_type_str, "LZO")) { + *format_type = TFileFormatType::FORMAT_CSV_LZO; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "BZ2")) { + *format_type = TFileFormatType::FORMAT_CSV_BZ2; + *compress_type = TFileCompressType::BZ2; + } else if (iequal(compress_type_str, "LZ4")) { + *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME; + *compress_type = TFileCompressType::LZ4FRAME; + } else if (iequal(compress_type_str, "LZOP")) { + *format_type = TFileFormatType::FORMAT_CSV_LZOP; + *compress_type = TFileCompressType::LZO; + } else if (iequal(compress_type_str, "DEFLATE")) { + *format_type = TFileFormatType::FORMAT_CSV_DEFLATE; + *compress_type = TFileCompressType::DEFLATE; + } + } else if (iequal(format_str, "JSON")) { + if (compress_type_str.empty()) { + *format_type = TFileFormatType::FORMAT_JSON; + } + } else if (iequal(format_str, "PARQUET")) { + *format_type = TFileFormatType::FORMAT_PARQUET; + } else if (iequal(format_str, "ORC")) { + *format_type = TFileFormatType::FORMAT_ORC; + } +} + +bool StreamLoadWithSqlAction::_is_format_support_streaming(TFileFormatType::type format) { + switch (format) { + case TFileFormatType::FORMAT_CSV_PLAIN: + case TFileFormatType::FORMAT_CSV_BZ2: + case TFileFormatType::FORMAT_CSV_DEFLATE: + case TFileFormatType::FORMAT_CSV_GZ: + case TFileFormatType::FORMAT_CSV_LZ4FRAME: + case TFileFormatType::FORMAT_CSV_LZO: + case TFileFormatType::FORMAT_CSV_LZOP: + case TFileFormatType::FORMAT_JSON: + return true; + default: + return false; + } +} + +StreamLoadWithSqlAction::StreamLoadWithSqlAction(ExecEnv* exec_env) : _exec_env(exec_env) { + _stream_load_with_sql_entity = + DorisMetrics::instance()->metric_registry()->register_entity("stream_load_with_sql"); + INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, + streaming_load_with_sql_requests_total); + INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, streaming_load_with_sql_duration_ms); + INT_GAUGE_METRIC_REGISTER(_stream_load_with_sql_entity, + streaming_load_with_sql_current_processing); +} + +StreamLoadWithSqlAction::~StreamLoadWithSqlAction() { + DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_with_sql_entity); +} + +void StreamLoadWithSqlAction::handle(HttpRequest* req) { + std::shared_ptr<StreamLoadContext> ctx = + std::static_pointer_cast<StreamLoadContext>(req->handler_ctx()); + if (ctx == nullptr) { + return; + } + + // status already set to fail + if (ctx->status.ok()) { + ctx->status = _handle(req, ctx); + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + } + } + ctx->load_cost_millis = UnixMillis() - ctx->start_millis; + + if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) { + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(ctx->status.to_string()); + } + } + + if (!ctx->status.ok()) { + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + return; + } + + // query stream load status + // put request + TStreamLoadWithLoadStatusRequest request; + TStreamLoadWithLoadStatusResult result; + request.__set_loadId(ctx->id.to_thrift()); + TNetworkAddress master_addr = _exec_env->master_info()->network_address; + while (true) { + ThriftRpcHelper::rpc<FrontendServiceClient>( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->streamLoadWithLoadStatus(result, request); + }); + // Status stream_load_status(result.status); + // if (stream_load_status.ok()) { + // ctx->txn_id = result.txn_id; + // ctx->number_total_rows = result.total_rows; + // ctx->number_loaded_rows = result.loaded_rows; + // ctx->number_filtered_rows = result.filtered_rows; + // ctx->number_unselected_rows = result.unselected_rows; + // break; + // } + } + + auto str = std::string(ctx->to_json()); + // add new line at end + str = str + '\n'; + HttpChannel::send_reply(req, str); + if (config::enable_stream_load_record) { + str = ctx->prepare_stream_load_record(str); + _save_stream_load_record(ctx, str); + } + // update statistics + streaming_load_with_sql_requests_total->increment(1); + streaming_load_with_sql_duration_ms->increment(ctx->load_cost_millis); + streaming_load_with_sql_current_processing->increment(-1); +} + +Status StreamLoadWithSqlAction::_handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx) { + if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { + LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes + << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; + return Status::InternalError("receive body don't equal with body bytes"); + } + if (!ctx->use_streaming) { + // if we use non-streaming, we need to close file first, + // then execute_plan_fragment here + // this will close file + ctx->body_sink.reset(); + _process_put(req, ctx); + } else { + RETURN_IF_ERROR(ctx->body_sink->finish()); + } + std::future_status future_status = Review Comment: What is this waiting for? ########## fe/fe-core/src/main/java/org/apache/doris/tablefunction/StreamTableValuedFunction.java: ########## @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.tablefunction; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.thrift.TFileType; + +import org.apache.commons.collections.map.CaseInsensitiveMap; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Map; + +/** + * The Implement of table valued function + * stream("FORMAT" = "csv"). + */ +public class StreamTableValuedFunction extends ExternalFileTableValuedFunction { + private static final Logger LOG = LogManager.getLogger(StreamTableValuedFunction.class); + public static final String NAME = "stream"; + + public StreamTableValuedFunction(Map<String, String> params) throws AnalysisException { + Map<String, String> validParams = new CaseInsensitiveMap(); + for (String key : params.keySet()) { + if (!FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) { + throw new AnalysisException(key + " is invalid property"); + } + validParams.put(key, params.get(key)); + } + parseProperties(validParams); + parseFile(); Review Comment: Do we need to call `parseFile`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
