morningman commented on code in PR #22509:
URL: https://github.com/apache/doris/pull/22509#discussion_r1285234852


##########
be/src/http/action/stream_load_with_sql.cpp:
##########
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_with_sql.h"
+
+#include <cstddef>
+#include <deque>
+#include <future>
+#include <shared_mutex>
+#include <sstream>
+
+// use string iequal
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+#include <event2/http.h>
+#include <rapidjson/prettywriter.h>
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include "common/config.h"
+#include "common/consts.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/utils.h"
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "http/http_channel.h"
+#include "http/http_common.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_response.h"
+#include "http/utils.h"
+#include "io/fs/stream_load_pipe.h"
+#include "olap/storage_engine.h"
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/load_path_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/stream_load/stream_load_recorder.h"
+#include "util/byte_buffer.h"
+#include "util/debug_util.h"
+#include "util/doris_metrics.h"
+#include "util/metrics.h"
+#include "util/string_util.h"
+#include "util/thrift_rpc_helper.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_requests_total, 
MetricUnit::REQUESTS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_duration_ms, 
MetricUnit::MILLISECONDS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing,
+                                   MetricUnit::REQUESTS);
+
+void StreamLoadWithSqlAction::_parse_format(const std::string& format_str,
+                                            const std::string& 
compress_type_str,
+                                            TFileFormatType::type* format_type,
+                                            TFileCompressType::type* 
compress_type) {
+    if (format_str.empty()) {
+        _parse_format("CSV", compress_type_str, format_type, compress_type);
+        return;
+    }
+    *compress_type = TFileCompressType::PLAIN;
+    *format_type = TFileFormatType::FORMAT_UNKNOWN;
+    if (iequal(format_str, "CSV")) {
+        if (compress_type_str.empty()) {
+            *format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+        } else if (iequal(compress_type_str, "GZ")) {
+            *format_type = TFileFormatType::FORMAT_CSV_GZ;
+            *compress_type = TFileCompressType::GZ;
+        } else if (iequal(compress_type_str, "LZO")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZO;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "BZ2")) {
+            *format_type = TFileFormatType::FORMAT_CSV_BZ2;
+            *compress_type = TFileCompressType::BZ2;
+        } else if (iequal(compress_type_str, "LZ4")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
+            *compress_type = TFileCompressType::LZ4FRAME;
+        } else if (iequal(compress_type_str, "LZOP")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZOP;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "DEFLATE")) {
+            *format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
+            *compress_type = TFileCompressType::DEFLATE;
+        }
+    } else if (iequal(format_str, "JSON")) {
+        if (compress_type_str.empty()) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+        }
+    } else if (iequal(format_str, "PARQUET")) {
+        *format_type = TFileFormatType::FORMAT_PARQUET;
+    } else if (iequal(format_str, "ORC")) {
+        *format_type = TFileFormatType::FORMAT_ORC;
+    }
+}
+
+bool 
StreamLoadWithSqlAction::_is_format_support_streaming(TFileFormatType::type 
format) {
+    switch (format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+    case TFileFormatType::FORMAT_CSV_BZ2:
+    case TFileFormatType::FORMAT_CSV_DEFLATE:
+    case TFileFormatType::FORMAT_CSV_GZ:
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+    case TFileFormatType::FORMAT_CSV_LZO:
+    case TFileFormatType::FORMAT_CSV_LZOP:
+    case TFileFormatType::FORMAT_JSON:
+        return true;
+    default:
+        return false;
+    }
+}
+
+StreamLoadWithSqlAction::StreamLoadWithSqlAction(ExecEnv* exec_env) : 
_exec_env(exec_env) {
+    _stream_load_with_sql_entity =
+            
DorisMetrics::instance()->metric_registry()->register_entity("stream_load_with_sql");
+    INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity,
+                                streaming_load_with_sql_requests_total);
+    INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, 
streaming_load_with_sql_duration_ms);
+    INT_GAUGE_METRIC_REGISTER(_stream_load_with_sql_entity,
+                              streaming_load_with_sql_current_processing);
+}
+
+StreamLoadWithSqlAction::~StreamLoadWithSqlAction() {
+    
DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_with_sql_entity);
+}
+
+void StreamLoadWithSqlAction::handle(HttpRequest* req) {
+    std::shared_ptr<StreamLoadContext> ctx =
+            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
+    if (ctx == nullptr) {
+        return;
+    }
+
+    // status already set to fail
+    if (ctx->status.ok()) {
+        ctx->status = _handle(req, ctx);
+        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                         << ", errmsg=" << ctx->status;
+        }
+    }
+    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        if (ctx->body_sink != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+    }
+
+    if (!ctx->status.ok()) {
+        auto str = std::string(ctx->to_json());
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        return;
+    }
+
+    // query stream load status
+    // put request
+    TStreamLoadWithLoadStatusRequest request;
+    TStreamLoadWithLoadStatusResult result;
+    request.__set_loadId(ctx->id.to_thrift());
+    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    while (true) {
+        ThriftRpcHelper::rpc<FrontendServiceClient>(
+                master_addr.hostname, master_addr.port,
+                [&request, &result](FrontendServiceConnection& client) {
+                    client->streamLoadWithLoadStatus(result, request);
+                });
+        //        Status stream_load_status(result.status);

Review Comment:
   Remove unused code



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -1812,6 +1844,106 @@ public TStreamLoadMultiTablePutResult 
streamLoadMultiTablePut(TStreamLoadPutRequ
         return result;
     }
 
+    public class ReportStreamLoadWorker implements Runnable {

Review Comment:
   Add comment to explain what is this class for?
   And is it suitable to make it as a inner class of `FrontendServiceImpl`?
   It will make `FrontendServiceImpl` too large



##########
gensrc/thrift/FrontendService.thrift:
##########
@@ -621,6 +625,20 @@ struct TStreamLoadMultiTablePutResult {
     3: optional list<PaloInternalService.TPipelineFragmentParams> 
pipeline_params
 }
 
+// StreamLoadWith request status
+struct TStreamLoadWithLoadStatusRequest {
+    1: required Types.TUniqueId loadId

Review Comment:
   Do not use `required`, use `optional`.
   Same for other fields below.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -187,6 +187,10 @@ public class Coordinator {
     // Once this is set to true, errors from remote fragments are ignored.
     private boolean returnedAllResults;
 
+    private RuntimeProfile queryProfile;

Review Comment:
   Add comment to explain why need a new query profile here?



##########
be/src/common/config.cpp:
##########
@@ -484,6 +484,15 @@ DEFINE_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800");
 
+// use memory in stream load default
+DEFINE_Int64(stream_load_exec_mem_limit, "214748364"); // 2G
+// The buffer size to store stream table function schema info
+DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB
+// The exec timeout of stream load default
+DEFINE_Int32(stream_load_timeout_second, "10");
+// The timeout of BE wait FE report status
+DEFINE_Int32(stream_load_report_timeout_second, "3");

Review Comment:
   What's this for? I think we add too much configs for this feature.
   The above `stream_load_exec_mem_limit` is also unnecessary



##########
be/src/http/action/stream_load_with_sql.cpp:
##########
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_with_sql.h"
+
+#include <cstddef>
+#include <deque>
+#include <future>
+#include <shared_mutex>
+#include <sstream>
+
+// use string iequal
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+#include <event2/http.h>
+#include <rapidjson/prettywriter.h>
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include "common/config.h"
+#include "common/consts.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/utils.h"
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "http/http_channel.h"
+#include "http/http_common.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_response.h"
+#include "http/utils.h"
+#include "io/fs/stream_load_pipe.h"
+#include "olap/storage_engine.h"
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/load_path_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/stream_load/stream_load_recorder.h"
+#include "util/byte_buffer.h"
+#include "util/debug_util.h"
+#include "util/doris_metrics.h"
+#include "util/metrics.h"
+#include "util/string_util.h"
+#include "util/thrift_rpc_helper.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_requests_total, 
MetricUnit::REQUESTS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_duration_ms, 
MetricUnit::MILLISECONDS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing,
+                                   MetricUnit::REQUESTS);
+
+void StreamLoadWithSqlAction::_parse_format(const std::string& format_str,
+                                            const std::string& 
compress_type_str,
+                                            TFileFormatType::type* format_type,
+                                            TFileCompressType::type* 
compress_type) {
+    if (format_str.empty()) {
+        _parse_format("CSV", compress_type_str, format_type, compress_type);
+        return;
+    }
+    *compress_type = TFileCompressType::PLAIN;
+    *format_type = TFileFormatType::FORMAT_UNKNOWN;
+    if (iequal(format_str, "CSV")) {
+        if (compress_type_str.empty()) {
+            *format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+        } else if (iequal(compress_type_str, "GZ")) {
+            *format_type = TFileFormatType::FORMAT_CSV_GZ;
+            *compress_type = TFileCompressType::GZ;
+        } else if (iequal(compress_type_str, "LZO")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZO;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "BZ2")) {
+            *format_type = TFileFormatType::FORMAT_CSV_BZ2;
+            *compress_type = TFileCompressType::BZ2;
+        } else if (iequal(compress_type_str, "LZ4")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
+            *compress_type = TFileCompressType::LZ4FRAME;
+        } else if (iequal(compress_type_str, "LZOP")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZOP;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "DEFLATE")) {
+            *format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
+            *compress_type = TFileCompressType::DEFLATE;
+        }
+    } else if (iequal(format_str, "JSON")) {
+        if (compress_type_str.empty()) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+        }
+    } else if (iequal(format_str, "PARQUET")) {
+        *format_type = TFileFormatType::FORMAT_PARQUET;
+    } else if (iequal(format_str, "ORC")) {
+        *format_type = TFileFormatType::FORMAT_ORC;
+    }
+}
+
+bool 
StreamLoadWithSqlAction::_is_format_support_streaming(TFileFormatType::type 
format) {
+    switch (format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+    case TFileFormatType::FORMAT_CSV_BZ2:
+    case TFileFormatType::FORMAT_CSV_DEFLATE:
+    case TFileFormatType::FORMAT_CSV_GZ:
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+    case TFileFormatType::FORMAT_CSV_LZO:
+    case TFileFormatType::FORMAT_CSV_LZOP:
+    case TFileFormatType::FORMAT_JSON:
+        return true;
+    default:
+        return false;
+    }
+}
+
+StreamLoadWithSqlAction::StreamLoadWithSqlAction(ExecEnv* exec_env) : 
_exec_env(exec_env) {
+    _stream_load_with_sql_entity =
+            
DorisMetrics::instance()->metric_registry()->register_entity("stream_load_with_sql");
+    INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity,
+                                streaming_load_with_sql_requests_total);
+    INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, 
streaming_load_with_sql_duration_ms);
+    INT_GAUGE_METRIC_REGISTER(_stream_load_with_sql_entity,
+                              streaming_load_with_sql_current_processing);
+}
+
+StreamLoadWithSqlAction::~StreamLoadWithSqlAction() {
+    
DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_with_sql_entity);
+}
+
+void StreamLoadWithSqlAction::handle(HttpRequest* req) {
+    std::shared_ptr<StreamLoadContext> ctx =
+            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
+    if (ctx == nullptr) {
+        return;
+    }
+
+    // status already set to fail
+    if (ctx->status.ok()) {
+        ctx->status = _handle(req, ctx);
+        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                         << ", errmsg=" << ctx->status;
+        }
+    }
+    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        if (ctx->body_sink != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+    }
+
+    if (!ctx->status.ok()) {
+        auto str = std::string(ctx->to_json());
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        return;
+    }
+
+    // query stream load status
+    // put request
+    TStreamLoadWithLoadStatusRequest request;
+    TStreamLoadWithLoadStatusResult result;
+    request.__set_loadId(ctx->id.to_thrift());
+    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    while (true) {
+        ThriftRpcHelper::rpc<FrontendServiceClient>(
+                master_addr.hostname, master_addr.port,
+                [&request, &result](FrontendServiceConnection& client) {
+                    client->streamLoadWithLoadStatus(result, request);
+                });
+        //        Status stream_load_status(result.status);
+        //        if (stream_load_status.ok()) {
+        //            ctx->txn_id = result.txn_id;
+        //            ctx->number_total_rows = result.total_rows;
+        //            ctx->number_loaded_rows = result.loaded_rows;
+        //            ctx->number_filtered_rows = result.filtered_rows;
+        //            ctx->number_unselected_rows = result.unselected_rows;
+        //            break;
+        //        }
+    }
+
+    auto str = std::string(ctx->to_json());
+    // add new line at end
+    str = str + '\n';
+    HttpChannel::send_reply(req, str);
+    if (config::enable_stream_load_record) {
+        str = ctx->prepare_stream_load_record(str);
+        _save_stream_load_record(ctx, str);
+    }
+    // update statistics
+    streaming_load_with_sql_requests_total->increment(1);
+    streaming_load_with_sql_duration_ms->increment(ctx->load_cost_millis);
+    streaming_load_with_sql_current_processing->increment(-1);
+}
+
+Status StreamLoadWithSqlAction::_handle(HttpRequest* req, 
std::shared_ptr<StreamLoadContext> ctx) {
+    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
+        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
+                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
+        return Status::InternalError("receive body don't equal with body 
bytes");
+    }
+    if (!ctx->use_streaming) {

Review Comment:
   I think we can simply not support non-stream format such as parquet and orc.
   To make the code simple



##########
be/src/common/config.cpp:
##########
@@ -484,6 +484,15 @@ DEFINE_Int32(stream_load_record_expire_time_secs, "28800");
 // time interval to clean expired stream load records
 DEFINE_mInt64(clean_stream_load_record_interval_secs, "1800");
 
+// use memory in stream load default
+DEFINE_Int64(stream_load_exec_mem_limit, "214748364"); // 2G
+// The buffer size to store stream table function schema info
+DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB
+// The exec timeout of stream load default
+DEFINE_Int32(stream_load_timeout_second, "10");

Review Comment:
   Why do need to set the timeout of stream load in BE config?



##########
fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java:
##########
@@ -252,6 +256,7 @@ public class Coordinator {
     public List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
     // Runtime filter ID to the builder instance number
     public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
+    ConnectContext context;

Review Comment:
   ```suggestion
       private ConnectContext context;
   ```



##########
be/src/http/action/stream_load_with_sql.cpp:
##########
@@ -0,0 +1,544 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/stream_load_with_sql.h"
+
+#include <cstddef>
+#include <deque>
+#include <future>
+#include <shared_mutex>
+#include <sstream>
+
+// use string iequal
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+#include <event2/http.h>
+#include <rapidjson/prettywriter.h>
+#include <thrift/protocol/TDebugProtocol.h>
+
+#include "common/config.h"
+#include "common/consts.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "common/utils.h"
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/FrontendService_types.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "http/http_channel.h"
+#include "http/http_common.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_response.h"
+#include "http/utils.h"
+#include "io/fs/stream_load_pipe.h"
+#include "olap/storage_engine.h"
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/load_path_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/stream_load/new_load_stream_mgr.h"
+#include "runtime/stream_load/stream_load_context.h"
+#include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/stream_load/stream_load_recorder.h"
+#include "util/byte_buffer.h"
+#include "util/debug_util.h"
+#include "util/doris_metrics.h"
+#include "util/metrics.h"
+#include "util/string_util.h"
+#include "util/thrift_rpc_helper.h"
+#include "util/time.h"
+#include "util/uid_util.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_requests_total, 
MetricUnit::REQUESTS);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_duration_ms, 
MetricUnit::MILLISECONDS);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(streaming_load_with_sql_current_processing,
+                                   MetricUnit::REQUESTS);
+
+void StreamLoadWithSqlAction::_parse_format(const std::string& format_str,
+                                            const std::string& 
compress_type_str,
+                                            TFileFormatType::type* format_type,
+                                            TFileCompressType::type* 
compress_type) {
+    if (format_str.empty()) {
+        _parse_format("CSV", compress_type_str, format_type, compress_type);
+        return;
+    }
+    *compress_type = TFileCompressType::PLAIN;
+    *format_type = TFileFormatType::FORMAT_UNKNOWN;
+    if (iequal(format_str, "CSV")) {
+        if (compress_type_str.empty()) {
+            *format_type = TFileFormatType::FORMAT_CSV_PLAIN;
+        } else if (iequal(compress_type_str, "GZ")) {
+            *format_type = TFileFormatType::FORMAT_CSV_GZ;
+            *compress_type = TFileCompressType::GZ;
+        } else if (iequal(compress_type_str, "LZO")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZO;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "BZ2")) {
+            *format_type = TFileFormatType::FORMAT_CSV_BZ2;
+            *compress_type = TFileCompressType::BZ2;
+        } else if (iequal(compress_type_str, "LZ4")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
+            *compress_type = TFileCompressType::LZ4FRAME;
+        } else if (iequal(compress_type_str, "LZOP")) {
+            *format_type = TFileFormatType::FORMAT_CSV_LZOP;
+            *compress_type = TFileCompressType::LZO;
+        } else if (iequal(compress_type_str, "DEFLATE")) {
+            *format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
+            *compress_type = TFileCompressType::DEFLATE;
+        }
+    } else if (iequal(format_str, "JSON")) {
+        if (compress_type_str.empty()) {
+            *format_type = TFileFormatType::FORMAT_JSON;
+        }
+    } else if (iequal(format_str, "PARQUET")) {
+        *format_type = TFileFormatType::FORMAT_PARQUET;
+    } else if (iequal(format_str, "ORC")) {
+        *format_type = TFileFormatType::FORMAT_ORC;
+    }
+}
+
+bool 
StreamLoadWithSqlAction::_is_format_support_streaming(TFileFormatType::type 
format) {
+    switch (format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+    case TFileFormatType::FORMAT_CSV_BZ2:
+    case TFileFormatType::FORMAT_CSV_DEFLATE:
+    case TFileFormatType::FORMAT_CSV_GZ:
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+    case TFileFormatType::FORMAT_CSV_LZO:
+    case TFileFormatType::FORMAT_CSV_LZOP:
+    case TFileFormatType::FORMAT_JSON:
+        return true;
+    default:
+        return false;
+    }
+}
+
+StreamLoadWithSqlAction::StreamLoadWithSqlAction(ExecEnv* exec_env) : 
_exec_env(exec_env) {
+    _stream_load_with_sql_entity =
+            
DorisMetrics::instance()->metric_registry()->register_entity("stream_load_with_sql");
+    INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity,
+                                streaming_load_with_sql_requests_total);
+    INT_COUNTER_METRIC_REGISTER(_stream_load_with_sql_entity, 
streaming_load_with_sql_duration_ms);
+    INT_GAUGE_METRIC_REGISTER(_stream_load_with_sql_entity,
+                              streaming_load_with_sql_current_processing);
+}
+
+StreamLoadWithSqlAction::~StreamLoadWithSqlAction() {
+    
DorisMetrics::instance()->metric_registry()->deregister_entity(_stream_load_with_sql_entity);
+}
+
+void StreamLoadWithSqlAction::handle(HttpRequest* req) {
+    std::shared_ptr<StreamLoadContext> ctx =
+            std::static_pointer_cast<StreamLoadContext>(req->handler_ctx());
+    if (ctx == nullptr) {
+        return;
+    }
+
+    // status already set to fail
+    if (ctx->status.ok()) {
+        ctx->status = _handle(req, ctx);
+        if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                         << ", errmsg=" << ctx->status;
+        }
+    }
+    ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
+
+    if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        if (ctx->body_sink != nullptr) {
+            ctx->body_sink->cancel(ctx->status.to_string());
+        }
+    }
+
+    if (!ctx->status.ok()) {
+        auto str = std::string(ctx->to_json());
+        // add new line at end
+        str = str + '\n';
+        HttpChannel::send_reply(req, str);
+        return;
+    }
+
+    // query stream load status
+    // put request
+    TStreamLoadWithLoadStatusRequest request;
+    TStreamLoadWithLoadStatusResult result;
+    request.__set_loadId(ctx->id.to_thrift());
+    TNetworkAddress master_addr = _exec_env->master_info()->network_address;
+    while (true) {
+        ThriftRpcHelper::rpc<FrontendServiceClient>(
+                master_addr.hostname, master_addr.port,
+                [&request, &result](FrontendServiceConnection& client) {
+                    client->streamLoadWithLoadStatus(result, request);
+                });
+        //        Status stream_load_status(result.status);
+        //        if (stream_load_status.ok()) {
+        //            ctx->txn_id = result.txn_id;
+        //            ctx->number_total_rows = result.total_rows;
+        //            ctx->number_loaded_rows = result.loaded_rows;
+        //            ctx->number_filtered_rows = result.filtered_rows;
+        //            ctx->number_unselected_rows = result.unselected_rows;
+        //            break;
+        //        }
+    }
+
+    auto str = std::string(ctx->to_json());
+    // add new line at end
+    str = str + '\n';
+    HttpChannel::send_reply(req, str);
+    if (config::enable_stream_load_record) {
+        str = ctx->prepare_stream_load_record(str);
+        _save_stream_load_record(ctx, str);
+    }
+    // update statistics
+    streaming_load_with_sql_requests_total->increment(1);
+    streaming_load_with_sql_duration_ms->increment(ctx->load_cost_millis);
+    streaming_load_with_sql_current_processing->increment(-1);
+}
+
+Status StreamLoadWithSqlAction::_handle(HttpRequest* req, 
std::shared_ptr<StreamLoadContext> ctx) {
+    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
+        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
+                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
+        return Status::InternalError("receive body don't equal with body 
bytes");
+    }
+    if (!ctx->use_streaming) {
+        // if we use non-streaming, we need to close file first,
+        // then execute_plan_fragment here
+        // this will close file
+        ctx->body_sink.reset();
+        _process_put(req, ctx);
+    } else {
+        RETURN_IF_ERROR(ctx->body_sink->finish());
+    }
+    std::future_status future_status =

Review Comment:
   What is this waiting for?



##########
fe/fe-core/src/main/java/org/apache/doris/tablefunction/StreamTableValuedFunction.java:
##########
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.tablefunction;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.thrift.TFileType;
+
+import org.apache.commons.collections.map.CaseInsensitiveMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+
+/**
+ * The Implement of table valued function
+ * stream("FORMAT" = "csv").
+ */
+public class StreamTableValuedFunction extends ExternalFileTableValuedFunction 
{
+    private static final Logger LOG = 
LogManager.getLogger(StreamTableValuedFunction.class);
+    public static final String NAME = "stream";
+
+    public StreamTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
+        Map<String, String> validParams = new CaseInsensitiveMap();
+        for (String key : params.keySet()) {
+            if (!FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) {
+                throw new AnalysisException(key + " is invalid property");
+            }
+            validParams.put(key, params.get(key));
+        }
+        parseProperties(validParams);
+        parseFile();

Review Comment:
   Do we need to call `parseFile`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to