This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 58bf79f79e6 [fix](move-memtable) pass load stream num to backends
(#26198)
58bf79f79e6 is described below
commit 58bf79f79e6bccea1942d0234855f667fce8546e
Author: Kaijie Chen <[email protected]>
AuthorDate: Wed Nov 8 16:16:33 2023 +0800
[fix](move-memtable) pass load stream num to backends (#26198)
---
be/src/common/config.cpp | 2 -
be/src/common/config.h | 2 -
be/src/pipeline/pipeline_fragment_context.cpp | 2 +
.../pipeline_x/pipeline_x_fragment_context.cpp | 2 +
be/src/runtime/load_stream.cpp | 97 ++++++++--------
be/src/runtime/load_stream.h | 26 +++--
be/src/runtime/plan_fragment_executor.cpp | 2 +
be/src/runtime/runtime_state.h | 14 +++
be/src/service/internal_service.cpp | 1 -
be/src/vec/sink/load_stream_stub.cpp | 5 +-
be/src/vec/sink/load_stream_stub.h | 3 +-
be/src/vec/sink/load_stream_stub_pool.cpp | 4 +-
be/src/vec/sink/load_stream_stub_pool.h | 3 +-
be/src/vec/sink/vtablet_sink_v2.cpp | 16 ++-
be/src/vec/sink/vtablet_sink_v2.h | 2 +
be/test/runtime/load_stream_test.cpp | 127 +++++++++++++++++++--
be/test/vec/exec/load_stream_stub_pool_test.cpp | 6 +-
.../apache/doris/planner/StreamLoadPlanner.java | 4 +
.../main/java/org/apache/doris/qe/Coordinator.java | 35 ++++++
.../java/org/apache/doris/qe/SessionVariable.java | 13 +++
.../java/org/apache/doris/task/LoadTaskInfo.java | 4 +
.../java/org/apache/doris/task/StreamLoadTask.java | 13 +++
gensrc/proto/internal_service.proto | 1 +
gensrc/thrift/FrontendService.thrift | 1 +
gensrc/thrift/PaloInternalService.thrift | 8 ++
25 files changed, 312 insertions(+), 81 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 17f69c3d360..0ccb48f116e 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -742,8 +742,6 @@ DEFINE_mDouble(tablet_version_graph_orphan_vertex_ratio,
"0.1");
// share delta writers when memtable_on_sink_node = true
DEFINE_Bool(share_delta_writers, "true");
-// number of brpc stream per load
-DEFINE_Int32(num_streams_per_load, "5");
// timeout for open load stream rpc in ms
DEFINE_Int64(open_load_stream_timeout_ms, "500");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index cfcd09c1984..f0fdf58558c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -799,8 +799,6 @@ DECLARE_mDouble(tablet_version_graph_orphan_vertex_ratio);
// share delta writers when memtable_on_sink_node = true
DECLARE_Bool(share_delta_writers);
-// number of brpc stream per load
-DECLARE_Int32(num_streams_per_load);
// timeout for open load stream rpc in ms
DECLARE_Int64(open_load_stream_timeout_ms);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 67d0e6045b8..62bc3ca8dec 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -318,6 +318,8 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
+ _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+ _runtime_state->set_total_load_streams(request.total_load_streams);
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 95cdf47bec5..fee49621f0e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -211,6 +211,8 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
}
_runtime_state->set_desc_tbl(_desc_tbl);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
+ _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+ _runtime_state->set_total_load_streams(request.total_load_streams);
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index b9cc5891b48..6f66e7239d6 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -30,6 +30,7 @@
#include "runtime/load_channel.h"
#include "runtime/load_stream_mgr.h"
#include "runtime/load_stream_writer.h"
+#include "util/runtime_profile.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
@@ -251,6 +252,8 @@ LoadStream::~LoadStream() {
Status LoadStream::init(const POpenLoadStreamRequest* request) {
_txn_id = request->txn_id();
+ _total_streams = request->total_streams();
+ DCHECK(_total_streams > 0) << "total streams should be greator than 0";
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
@@ -265,41 +268,49 @@ Status LoadStream::init(const POpenLoadStreamRequest*
request) {
Status LoadStream::close(int64_t src_id, const std::vector<PTabletID>&
tablets_to_commit,
std::vector<int64_t>* success_tablet_ids,
std::vector<int64_t>* failed_tablet_ids) {
- std::lock_guard lock_guard(_lock);
+ std::lock_guard<bthread::Mutex> lock_guard(_lock);
SCOPED_TIMER(_close_wait_timer);
// we do nothing until recv CLOSE_LOAD from all stream to ensure all data
are handled before ack
_open_streams[src_id]--;
- LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
- << _open_streams[src_id] << " streams";
if (_open_streams[src_id] == 0) {
_open_streams.erase(src_id);
}
+ _close_load_cnt++;
+ LOG(INFO) << "received CLOSE_LOAD from sender " << src_id << ", remaining "
+ << _total_streams - _close_load_cnt << " senders";
+
+ _tablets_to_commit.insert(_tablets_to_commit.end(),
tablets_to_commit.begin(),
+ tablets_to_commit.end());
+
+ if (_close_load_cnt < _total_streams) {
+ // do not return commit info if there is remaining streams.
+ return Status::OK();
+ }
Status st = Status::OK();
- if (_open_streams.size() == 0) {
+ {
bthread::Mutex mutex;
std::unique_lock<bthread::Mutex> lock(mutex);
bthread::ConditionVariable cond;
- bool ret = _load_stream_mgr->heavy_work_pool()->try_offer([this,
&success_tablet_ids,
-
&failed_tablet_ids,
-
&tablets_to_commit, &mutex,
- &cond,
&st]() {
- signal::set_signal_task_id(_load_id);
- for (auto& it : _index_streams_map) {
- st = it.second->close(tablets_to_commit, success_tablet_ids,
failed_tablet_ids);
- if (!st.ok()) {
+ bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
+ [this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond,
&st]() {
+ signal::set_signal_task_id(_load_id);
+ for (auto& it : _index_streams_map) {
+ st = it.second->close(_tablets_to_commit,
success_tablet_ids,
+ failed_tablet_ids);
+ if (!st.ok()) {
+ std::unique_lock<bthread::Mutex> lock(mutex);
+ cond.notify_one();
+ return;
+ }
+ }
+ LOG(INFO) << "close load " << *this
+ << ", failed_tablet_num=" <<
failed_tablet_ids->size()
+ << ", success_tablet_num=" <<
success_tablet_ids->size();
std::unique_lock<bthread::Mutex> lock(mutex);
cond.notify_one();
- return;
- }
- }
- LOG(INFO) << "close load " << *this
- << ", failed_tablet_num=" << failed_tablet_ids->size()
- << ", success_tablet_num=" << success_tablet_ids->size();
- std::unique_lock<bthread::Mutex> lock(mutex);
- cond.notify_one();
- });
+ });
if (ret) {
cond.wait(lock);
} else {
@@ -307,24 +318,21 @@ Status LoadStream::close(int64_t src_id, const
std::vector<PTabletID>& tablets_t
"there is not enough thread resource for close load");
}
}
-
- // do not return commit info for non-last one.
return st;
}
-void LoadStream::_report_result(StreamId stream, Status& st,
- std::vector<int64_t>* success_tablet_ids,
- std::vector<int64_t>* failed_tablet_ids) {
- LOG(INFO) << "report result, success tablet num " <<
success_tablet_ids->size()
- << ", failed tablet num " << failed_tablet_ids->size();
+void LoadStream::_report_result(StreamId stream, const Status& st,
+ const std::vector<int64_t>& success_tablet_ids,
+ const std::vector<int64_t>& failed_tablet_ids)
{
+ LOG(INFO) << "report result, success tablet num " <<
success_tablet_ids.size()
+ << ", failed tablet num " << failed_tablet_ids.size();
butil::IOBuf buf;
PWriteStreamSinkResponse response;
st.to_protobuf(response.mutable_status());
- for (auto& id : *success_tablet_ids) {
+ for (auto& id : success_tablet_ids) {
response.add_success_tablet_ids(id);
}
-
- for (auto& id : *failed_tablet_ids) {
+ for (auto& id : failed_tablet_ids) {
response.add_failed_tablet_ids(id);
}
@@ -421,18 +429,20 @@ int LoadStream::on_received_messages(StreamId id,
butil::IOBuf* const messages[]
void LoadStream::_dispatch(StreamId id, const PStreamHeader& hdr,
butil::IOBuf* data) {
VLOG_DEBUG << PStreamHeader_Opcode_Name(hdr.opcode()) << " from " <<
hdr.src_id()
<< " with tablet " << hdr.tablet_id();
+ if (UniqueId(hdr.load_id()) != UniqueId(_load_id)) {
+ Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid load
id {}, expected {}",
+
UniqueId(hdr.load_id()).to_string(),
+
UniqueId(_load_id).to_string());
+ _report_failure(id, st, hdr);
+ return;
+ }
{
std::lock_guard lock_guard(_lock);
if (!_open_streams.contains(hdr.src_id())) {
- std::vector<int64_t> success_tablet_ids;
- std::vector<int64_t> failed_tablet_ids;
- if (hdr.has_tablet_id()) {
- failed_tablet_ids.push_back(hdr.tablet_id());
- }
Status st = Status::Error<ErrorCode::INVALID_ARGUMENT>("no open
stream from source {}",
hdr.src_id());
- _report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
+ _report_failure(id, st, hdr);
return;
}
}
@@ -442,10 +452,7 @@ void LoadStream::_dispatch(StreamId id, const
PStreamHeader& hdr, butil::IOBuf*
case PStreamHeader::APPEND_DATA: {
auto st = _append_data(hdr, data);
if (!st.ok()) {
- std::vector<int64_t> success_tablet_ids;
- std::vector<int64_t> failed_tablet_ids;
- failed_tablet_ids.push_back(hdr.tablet_id());
- _report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
+ _report_failure(id, st, hdr);
}
} break;
case PStreamHeader::CLOSE_LOAD: {
@@ -454,7 +461,7 @@ void LoadStream::_dispatch(StreamId id, const
PStreamHeader& hdr, butil::IOBuf*
std::vector<PTabletID>
tablets_to_commit(hdr.tablets_to_commit().begin(),
hdr.tablets_to_commit().end());
auto st = close(hdr.src_id(), tablets_to_commit, &success_tablet_ids,
&failed_tablet_ids);
- _report_result(id, st, &success_tablet_ids, &failed_tablet_ids);
+ _report_result(id, st, success_tablet_ids, failed_tablet_ids);
brpc::StreamClose(id);
} break;
default:
@@ -468,9 +475,9 @@ void LoadStream::on_idle_timeout(StreamId id) {
}
void LoadStream::on_closed(StreamId id) {
- auto remaining_rpc_stream = remove_rpc_stream();
- LOG(INFO) << "stream closed " << id << ", remaining_rpc_stream=" <<
remaining_rpc_stream;
- if (remaining_rpc_stream == 0) {
+ auto remaining_streams = _total_streams - _close_rpc_cnt.fetch_add(1) - 1;
+ LOG(INFO) << "stream " << id << " on_closed, remaining streams = " <<
remaining_streams;
+ if (remaining_streams == 0) {
_load_stream_mgr->clear_load(_load_id);
}
}
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index fe7d90d502e..1c16c086e29 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -113,9 +113,6 @@ public:
_open_streams[src_id]++;
}
- uint32_t add_rpc_stream() { return ++_num_rpc_streams; }
- uint32_t remove_rpc_stream() { return --_num_rpc_streams; }
-
Status close(int64_t src_id, const std::vector<PTabletID>&
tablets_to_commit,
std::vector<int64_t>* success_tablet_ids,
std::vector<int64_t>* failed_tablet_ids);
@@ -130,16 +127,31 @@ private:
void _parse_header(butil::IOBuf* const message, PStreamHeader& hdr);
void _dispatch(StreamId id, const PStreamHeader& hdr, butil::IOBuf* data);
Status _append_data(const PStreamHeader& header, butil::IOBuf* data);
- void _report_result(StreamId stream, Status& st, std::vector<int64_t>*
success_tablet_ids,
- std::vector<int64_t>* failed_tablet_ids);
+
+ void _report_result(StreamId stream, const Status& st,
+ const std::vector<int64_t>& success_tablet_ids,
+ const std::vector<int64_t>& failed_tablet_ids);
+
+ // report failure for one message
+ void _report_failure(StreamId stream, const Status& status, const
PStreamHeader& header) {
+ std::vector<int64_t> success; // empty
+ std::vector<int64_t> failure;
+ if (header.has_tablet_id()) {
+ failure.push_back(header.tablet_id());
+ }
+ _report_result(stream, status, success, failure);
+ }
private:
PUniqueId _load_id;
std::unordered_map<int64_t, IndexStreamSharedPtr> _index_streams_map;
- std::atomic<uint32_t> _num_rpc_streams;
+ int32_t _total_streams = 0;
+ int32_t _close_load_cnt = 0;
+ std::atomic<int32_t> _close_rpc_cnt = 0;
+ std::vector<PTabletID> _tablets_to_commit;
bthread::Mutex _lock;
std::unordered_map<int64_t, int32_t> _open_streams;
- int64_t _txn_id;
+ int64_t _txn_id = 0;
std::shared_ptr<OlapTableSchemaParam> _schema;
bool _enable_profile = false;
std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 9344378e91e..39277c1a42f 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -224,6 +224,8 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
_runtime_state->set_per_fragment_instance_idx(params.sender_id);
_runtime_state->set_num_per_fragment_instances(params.num_senders);
+ _runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+ _runtime_state->set_total_load_streams(request.total_load_streams);
// set up sink, if required
if (request.fragment.__isset.output_sink) {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cba5142c154..e990540d2f4 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -309,6 +309,18 @@ public:
int num_per_fragment_instances() const { return
_num_per_fragment_instances; }
+ void set_load_stream_per_node(int load_stream_per_node) {
+ _load_stream_per_node = load_stream_per_node;
+ }
+
+ int load_stream_per_node() const { return _load_stream_per_node; }
+
+ void set_total_load_streams(int total_load_streams) {
+ _total_load_streams = total_load_streams;
+ }
+
+ int total_load_streams() const { return _total_load_streams; }
+
bool disable_stream_preaggregations() const {
return _query_options.disable_stream_preaggregations;
}
@@ -545,6 +557,8 @@ private:
int _per_fragment_instance_idx;
int _num_per_fragment_instances = 0;
+ int _load_stream_per_node = 0;
+ int _total_load_streams = 0;
// The backend id on which this fragment instance runs
int64_t _backend_id = -1;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 01151dcf340..d6846c07a16 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -401,7 +401,6 @@ void
PInternalServiceImpl::open_load_stream(google::protobuf::RpcController* con
return;
}
- load_stream->add_rpc_stream();
VLOG_DEBUG << "get streamid =" << streamid;
st.to_protobuf(response->mutable_status());
});
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index fe9887b3a3e..c2f7f246f30 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -102,11 +102,11 @@ LoadStreamStub::~LoadStreamStub() {
}
// open_load_stream
-// tablets means
Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>*
client_cache,
const NodeInfo& node_info, int64_t txn_id,
const OlapTableSchemaParam& schema,
- const std::vector<PTabletID>& tablets_for_schema,
bool enable_profile) {
+ const std::vector<PTabletID>& tablets_for_schema,
int total_streams,
+ bool enable_profile) {
_num_open++;
std::unique_lock<bthread::Mutex> lock(_mutex);
if (_is_init.load()) {
@@ -130,6 +130,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
+ request.set_total_streams(total_streams);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
*request.add_tablets() = tablet;
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 2db9ffcd950..3650b2aeae2 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -148,7 +148,8 @@ public:
// open_load_stream
Status open(BrpcClientCache<PBackendService_Stub>* client_cache, const
NodeInfo& node_info,
int64_t txn_id, const OlapTableSchemaParam& schema,
- const std::vector<PTabletID>& tablets_for_schema, bool
enable_profile);
+ const std::vector<PTabletID>& tablets_for_schema, int
total_streams,
+ bool enable_profile);
// for mock this class in UT
#ifdef BE_TEST
diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp
b/be/src/vec/sink/load_stream_stub_pool.cpp
index 834c1523865..5848ca5a51a 100644
--- a/be/src/vec/sink/load_stream_stub_pool.cpp
+++ b/be/src/vec/sink/load_stream_stub_pool.cpp
@@ -28,14 +28,14 @@ LoadStreamStubPool::LoadStreamStubPool() = default;
LoadStreamStubPool::~LoadStreamStubPool() = default;
std::shared_ptr<Streams> LoadStreamStubPool::get_or_create(PUniqueId load_id,
int64_t src_id,
- int64_t dst_id) {
+ int64_t dst_id, int
num_streams) {
auto key = std::make_pair(UniqueId(load_id), dst_id);
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _pool[key].lock();
if (streams) {
return streams;
}
- int32_t num_streams = std::max(1, config::num_streams_per_load);
+ DCHECK(num_streams > 0) << "stream num should be greater than 0";
auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub
{load_id, src_id});
auto deleter = [this, key](Streams* s) {
std::lock_guard<std::mutex> lock(_mutex);
diff --git a/be/src/vec/sink/load_stream_stub_pool.h
b/be/src/vec/sink/load_stream_stub_pool.h
index ae550340d25..73b41fdd61a 100644
--- a/be/src/vec/sink/load_stream_stub_pool.h
+++ b/be/src/vec/sink/load_stream_stub_pool.h
@@ -78,7 +78,8 @@ public:
~LoadStreamStubPool();
- std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id);
+ std::shared_ptr<Streams> get_or_create(PUniqueId load_id, int64_t src_id,
int64_t dst_id,
+ int num_streams);
size_t size() {
std::lock_guard<std::mutex> lock(_mutex);
diff --git a/be/src/vec/sink/vtablet_sink_v2.cpp
b/be/src/vec/sink/vtablet_sink_v2.cpp
index 69a372e0e38..6e610ee717a 100644
--- a/be/src/vec/sink/vtablet_sink_v2.cpp
+++ b/be/src/vec/sink/vtablet_sink_v2.cpp
@@ -152,6 +152,12 @@ Status VOlapTableSinkV2::prepare(RuntimeState* state) {
_sender_id = state->per_fragment_instance_idx();
_num_senders = state->num_per_fragment_instances();
+ _stream_per_node = state->load_stream_per_node();
+ _total_streams = state->total_load_streams();
+ DCHECK(_stream_per_node > 0) << "load stream per node should be greator
than 0";
+ DCHECK(_total_streams > 0) << "total load streams should be greator than
0";
+ LOG(INFO) << "num senders: " << _num_senders << ", stream per node: " <<
_stream_per_node
+ << ", total_streams " << _total_streams;
_is_high_priority =
(state->execution_timeout() <=
config::load_task_high_priority_threshold_second);
@@ -218,19 +224,19 @@ Status VOlapTableSinkV2::_open_streams(int64_t src_id) {
return Status::InternalError("Unknown node {} in tablet location",
dst_id);
}
std::shared_ptr<Streams> streams;
- streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(_load_id, src_id,
-
dst_id);
+ streams =
ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create(
+ _load_id, src_id, dst_id, _stream_per_node);
// get tablet schema from each backend only in the 1st stream
for (auto& stream : *streams | std::ranges::views::take(1)) {
const std::vector<PTabletID>& tablets_for_schema =
_indexes_from_node[node_info->id];
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
*node_info, _txn_id, *_schema,
tablets_for_schema,
- _state->enable_profile()));
+ _total_streams,
_state->enable_profile()));
}
// for the rest streams, open without getting tablet schema
for (auto& stream : *streams | std::ranges::views::drop(1)) {
RETURN_IF_ERROR(stream->open(_state->exec_env()->brpc_internal_client_cache(),
- *node_info, _txn_id, *_schema, {},
+ *node_info, _txn_id, *_schema, {},
_total_streams,
_state->enable_profile()));
}
_streams_for_node[dst_id] = streams;
@@ -293,7 +299,7 @@ Status VOlapTableSinkV2::_select_streams(int64_t tablet_id,
Streams& streams) {
for (auto& node_id : location->node_ids) {
streams.emplace_back(_streams_for_node[node_id]->at(_stream_index));
}
- _stream_index = (_stream_index + 1) % config::num_streams_per_load;
+ _stream_index = (_stream_index + 1) % _stream_per_node;
return Status::OK();
}
diff --git a/be/src/vec/sink/vtablet_sink_v2.h
b/be/src/vec/sink/vtablet_sink_v2.h
index a67c4e65cd2..1a67ea581ec 100644
--- a/be/src/vec/sink/vtablet_sink_v2.h
+++ b/be/src/vec/sink/vtablet_sink_v2.h
@@ -155,6 +155,8 @@ private:
// To support multiple senders, we maintain a channel for each sender.
int _sender_id = -1;
int _num_senders = -1;
+ int _stream_per_node = 0;
+ int _total_streams = 0;
bool _is_high_priority = false;
bool _write_file_cache = false;
diff --git a/be/test/runtime/load_stream_test.cpp
b/be/test/runtime/load_stream_test.cpp
index bdd0ace9a8b..05dfa2ee1a8 100644
--- a/be/test/runtime/load_stream_test.cpp
+++ b/be/test/runtime/load_stream_test.cpp
@@ -67,6 +67,7 @@ const uint32_t ABNORMAL_SENDER_ID = 10000;
const int64_t NORMAL_TXN_ID = 600001;
const UniqueId NORMAL_LOAD_ID(1, 1);
const UniqueId ABNORMAL_LOAD_ID(1, 0);
+std::string NORMAL_STRING("normal");
std::string ABNORMAL_STRING("abnormal");
void construct_schema(OlapTableSchemaParam* schema) {
@@ -374,6 +375,8 @@ public:
}
LoadStreamSharedPtr load_stream;
+ LOG(INFO) << "total streams: " << request->total_streams();
+ EXPECT_GT(request->total_streams(), 0);
auto st = _load_stream_mgr->open_load_stream(request, load_stream);
stream_options.handler = load_stream.get();
@@ -387,8 +390,6 @@ public:
return;
}
- load_stream->add_rpc_stream();
-
status->set_status_code(TStatusCode::OK);
response->set_allocated_status(status.get());
static_cast<void>(response->release_status());
@@ -417,7 +418,7 @@ public:
std::function<void()> _cb;
};
- Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID) {
+ Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID, int
total_streams = 1) {
brpc::Channel channel;
std::cerr << "connect_stream" << std::endl;
// Initialize the channel, NULL means using default options.
@@ -450,6 +451,7 @@ public:
*request.mutable_load_id() = id;
request.set_txn_id(NORMAL_TXN_ID);
request.set_src_id(sender_id);
+ request.set_total_streams(total_streams);
auto ptablet = request.add_tablets();
ptablet->set_tablet_id(NORMAL_TABLET_ID);
ptablet->set_index_id(NORMAL_INDEX_ID);
@@ -491,7 +493,7 @@ public:
: _heavy_work_pool(4, 32, "load_stream_test_heavy"),
_light_work_pool(4, 32, "load_stream_test_light") {}
- void close_load(MockSinkClient& client, uint32_t sender_id) {
+ void close_load(MockSinkClient& client, uint32_t sender_id =
NORMAL_SENDER_ID) {
butil::IOBuf append_buf;
PStreamHeader header;
header.mutable_load_id()->set_hi(1);
@@ -535,6 +537,11 @@ public:
static_cast<void>(client.send(&append_buf));
}
+ void write_normal(MockSinkClient& client) {
+ write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID,
+ NORMAL_TABLET_ID, 0, NORMAL_STRING, true);
+ }
+
void write_abnormal_load(MockSinkClient& client) {
write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID,
NORMAL_INDEX_ID,
NORMAL_TABLET_ID, 0, ABNORMAL_STRING, true);
@@ -657,25 +664,52 @@ public:
// <client, index, bucket>
// one client
-TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
+TEST_F(LoadStreamMgrTest, one_client_normal) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
- write_abnormal_load(client);
- // TODO check abnormal load id
+ write_normal(client);
reset_response_stat();
- close_load(client, 1);
+ close_load(client, ABNORMAL_SENDER_ID);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
- close_load(client, 0);
+ close_load(client);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
+ EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
+
+ // server will close stream on CLOSE_LOAD
+ wait_for_close();
+ EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
+}
+
+TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
+ MockSinkClient client;
+ auto st = client.connect_stream();
+ EXPECT_TRUE(st.ok());
+
+ reset_response_stat();
+ write_abnormal_load(client);
+ wait_for_ack(1);
+ EXPECT_EQ(g_response_stat.num, 1);
+ EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
+ EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
+
+ close_load(client);
+ wait_for_ack(2);
+ EXPECT_EQ(g_response_stat.num, 2);
+ EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
@@ -1063,7 +1097,7 @@ TEST_F(LoadStreamMgrTest,
two_client_one_index_one_tablet_three_segment) {
MockSinkClient clients[2];
for (int i = 0; i < 2; i++) {
- auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i);
+ auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i, 2);
EXPECT_TRUE(st.ok());
}
reset_response_stat();
@@ -1132,4 +1166,77 @@ TEST_F(LoadStreamMgrTest,
two_client_one_index_one_tablet_three_segment) {
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
+TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
+ MockSinkClient clients[2];
+
+ EXPECT_TRUE(clients[0].connect_stream(NORMAL_SENDER_ID, 2).ok());
+
+ reset_response_stat();
+
+ std::vector<std::string> segment_data;
+ segment_data.resize(6);
+ for (int32_t segid = 2; segid >= 0; segid--) {
+ for (int i = 0; i < 2; i++) {
+ std::string data = "sender_id=" + std::to_string(i) + ",segid=" +
std::to_string(segid);
+ segment_data[i * 3 + segid] = data;
+ LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data;
+ }
+ }
+
+ for (int32_t segid = 2; segid >= 0; segid--) {
+ int i = 0;
+ write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i,
NORMAL_INDEX_ID,
+ NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid],
true);
+ }
+
+ EXPECT_EQ(g_response_stat.num, 0);
+ // CLOSE_LOAD
+ close_load(clients[0], 0);
+ wait_for_ack(1);
+ EXPECT_EQ(g_response_stat.num, 1);
+ EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
+
+ // sender 0 closed, before open sender 1, load stream should still be open
+ EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
+
+ EXPECT_TRUE(clients[1].connect_stream(NORMAL_SENDER_ID + 1, 2).ok());
+
+ for (int32_t segid = 2; segid >= 0; segid--) {
+ int i = 1;
+ write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i,
NORMAL_INDEX_ID,
+ NORMAL_TABLET_ID, segid, segment_data[i * 3 + segid],
true);
+ }
+
+ close_load(clients[1], 1);
+ wait_for_ack(2);
+ EXPECT_EQ(g_response_stat.num, 2);
+ EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
+ EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
+ EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
+
+ // server will close stream on CLOSE_LOAD
+ wait_for_close();
+ EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
+
+ auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID,
NORMAL_TABLET_ID, 0);
+ size_t sender_pos = written_data.find('=');
+ size_t sender_end = written_data.find(',');
+ EXPECT_NE(sender_pos, std::string::npos);
+ EXPECT_NE(sender_end, std::string::npos);
+ auto sender_str = written_data.substr(sender_pos + 1, sender_end -
sender_pos);
+ LOG(INFO) << "sender_str " << sender_str;
+ uint32_t sender_id = std::stoi(sender_str);
+
+ for (int i = 0; i < 3; i++) {
+ auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID,
NORMAL_TABLET_ID, i);
+ EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
+ }
+ sender_id = (sender_id + 1) % 2;
+ for (int i = 0; i < 3; i++) {
+ auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID,
NORMAL_TABLET_ID, i + 3);
+ EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
+ }
+}
+
} // namespace doris
diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp
b/be/test/vec/exec/load_stream_stub_pool_test.cpp
index f1ccb70beeb..929b906aab7 100644
--- a/be/test/vec/exec/load_stream_stub_pool_test.cpp
+++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp
@@ -36,9 +36,9 @@ TEST_F(LoadStreamStubPoolTest, test) {
PUniqueId load_id;
load_id.set_hi(1);
load_id.set_hi(2);
- auto streams1 = pool.get_or_create(load_id, src_id, 101);
- auto streams2 = pool.get_or_create(load_id, src_id, 102);
- auto streams3 = pool.get_or_create(load_id, src_id, 101);
+ auto streams1 = pool.get_or_create(load_id, src_id, 101, 5);
+ auto streams2 = pool.get_or_create(load_id, src_id, 102, 5);
+ auto streams3 = pool.get_or_create(load_id, src_id, 101, 5);
EXPECT_EQ(2, pool.size());
EXPECT_EQ(1, pool.templates_size());
EXPECT_EQ(streams1, streams3);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 934bca7ac0c..f4131235da3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -298,6 +298,8 @@ public class StreamLoadPlanner {
perNodeScanRange.put(scanNode.getId().asInt(), scanRangeParams);
execParams.setPerNodeScanRanges(perNodeScanRange);
params.setParams(execParams);
+ params.setLoadStreamPerNode(taskInfo.getStreamPerNode());
+ params.setTotalLoadStreams(taskInfo.getStreamPerNode());
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setQueryType(TQueryType.LOAD);
queryOptions.setQueryTimeout(timeout);
@@ -499,6 +501,8 @@ public class StreamLoadPlanner {
pipParams.per_exch_num_senders = Maps.newHashMap();
pipParams.destinations = Lists.newArrayList();
pipParams.setNumSenders(1);
+ pipParams.setLoadStreamPerNode(taskInfo.getStreamPerNode());
+ pipParams.setTotalLoadStreams(taskInfo.getStreamPerNode());
TPipelineInstanceParams localParams = new TPipelineInstanceParams();
localParams.setFragmentInstanceId(new TUniqueId(loadId.hi, loadId.lo +
fragmentInstanceIdIndex));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 9aaca2f8e2c..56ca362e69b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -78,6 +78,7 @@ import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TBrokerScanRange;
+import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TDescriptorTable;
import org.apache.doris.thrift.TDetailedReportParams;
import org.apache.doris.thrift.TErrorTabletInfo;
@@ -688,6 +689,7 @@ public class Coordinator implements CoordInterface {
int backendIdx = 0;
int profileFragmentId = 0;
long memoryLimit = queryOptions.getMemLimit();
+ Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
beToExecStates.clear();
// If #fragments >=2, use twoPhaseExecution with
exec_plan_fragments_prepare and exec_plan_fragments_start,
// else use exec_plan_fragments directly.
@@ -755,8 +757,23 @@ public class Coordinator implements CoordInterface {
beToExecStates.putIfAbsent(execState.backend.getId(),
states);
}
states.addState(execState);
+ if (tParam.getFragment().getOutputSink() != null
+ && tParam.getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
+
backendsWithOlapTableSink.add(execState.backend.getId());
+ }
++backendIdx;
}
+ int loadStreamPerNode = 1;
+ if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable() != null) {
+ loadStreamPerNode =
ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
+ }
+ for (TExecPlanFragmentParams tParam : tParams) {
+ if (tParam.getFragment().getOutputSink() != null
+ && tParam.getFragment().getOutputSink().getType()
== TDataSinkType.OLAP_TABLE_SINK) {
+ tParam.setLoadStreamPerNode(loadStreamPerNode);
+
tParam.setTotalLoadStreams(backendsWithOlapTableSink.size() *
loadStreamPerNode);
+ }
+ }
profileFragmentId += 1;
} // end for fragments
@@ -845,6 +862,7 @@ public class Coordinator implements CoordInterface {
}
}
+ Set<Long> backendsWithOlapTableSink = Sets.newHashSet();
// 3. group PipelineExecContext by BE.
// So that we can use one RPC to send all fragment instances
of a BE.
for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry
: tParams.entrySet()) {
@@ -878,8 +896,25 @@ public class Coordinator implements CoordInterface {
}
ctxs.addContext(pipelineExecContext);
+ if (entry.getValue().getFragment().getOutputSink() != null
+ &&
entry.getValue().getFragment().getOutputSink().getType()
+ == TDataSinkType.OLAP_TABLE_SINK) {
+ backendsWithOlapTableSink.add(backendId);
+ }
++backendIdx;
}
+ int loadStreamPerNode = 1;
+ if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable() != null) {
+ loadStreamPerNode =
ConnectContext.get().getSessionVariable().getLoadStreamPerNode();
+ }
+ for (Map.Entry<TNetworkAddress, TPipelineFragmentParams> entry
: tParams.entrySet()) {
+ if (entry.getValue().getFragment().getOutputSink() != null
+ &&
entry.getValue().getFragment().getOutputSink().getType()
+ == TDataSinkType.OLAP_TABLE_SINK) {
+
entry.getValue().setLoadStreamPerNode(loadStreamPerNode);
+
entry.getValue().setTotalLoadStreams(backendsWithOlapTableSink.size() *
loadStreamPerNode);
+ }
+ }
profileFragmentId += 1;
} // end for fragments
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 36980570039..955450a15c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -411,6 +411,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_MEMTABLE_ON_SINK_NODE =
"enable_memtable_on_sink_node";
+ public static final String LOAD_STREAM_PER_NODE = "load_stream_per_node";
+
public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE =
"enable_unique_key_partial_update";
public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD =
"inverted_index_conjunction_opt_threshold";
@@ -1236,6 +1238,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_MEMTABLE_ON_SINK_NODE, needForward =
true)
public boolean enableMemtableOnSinkNode = false;
+ @VariableMgr.VarAttr(name = LOAD_STREAM_PER_NODE)
+ public int loadStreamPerNode = 20;
+
@VariableMgr.VarAttr(name = ENABLE_INSERT_GROUP_COMMIT)
public boolean enableInsertGroupCommit = false;
@@ -2405,6 +2410,14 @@ public class SessionVariable implements Serializable,
Writable {
this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate;
}
+ public int getLoadStreamPerNode() {
+ return loadStreamPerNode;
+ }
+
+ public void setLoadStreamPerNode(int loadStreamPerNode) {
+ this.loadStreamPerNode = loadStreamPerNode;
+ }
+
/**
* Serialize to thrift object.
* Used for rest api.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
index 3174e4d5c6b..610e243cd92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java
@@ -125,6 +125,10 @@ public interface LoadTaskInfo {
return false;
}
+ default int getStreamPerNode() {
+ return 20;
+ }
+
class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
index 485a3599b38..53a47d385b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java
@@ -89,6 +89,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private boolean enableProfile = false;
private boolean memtableOnSinkNode = false;
+ private int streamPerNode = 20;
private byte enclose = 0;
@@ -309,6 +310,15 @@ public class StreamLoadTask implements LoadTaskInfo {
this.memtableOnSinkNode = memtableOnSinkNode;
}
+ @Override
+ public int getStreamPerNode() {
+ return streamPerNode;
+ }
+
+ public void setStreamPerNode(int streamPerNode) {
+ this.streamPerNode = streamPerNode;
+ }
+
public static StreamLoadTask
fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new
StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType(),
@@ -447,6 +457,9 @@ public class StreamLoadTask implements LoadTaskInfo {
if (request.isSetMemtableOnSinkNode()) {
this.memtableOnSinkNode = request.isMemtableOnSinkNode();
}
+ if (request.isSetStreamPerNode()) {
+ this.streamPerNode = request.getStreamPerNode();
+ }
}
// used for stream load
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 9a36916317f..f9c2603cb98 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -749,6 +749,7 @@ message POpenLoadStreamRequest {
optional POlapTableSchemaParam schema = 4;
repeated PTabletID tablets = 5;
optional bool enable_profile = 6 [default = false];
+ optional int64 total_streams = 7;
}
message PTabletSchemaWithIndex {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 1e0ad2d4624..04eaa578241 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -641,6 +641,7 @@ struct TStreamLoadPutRequest {
52: optional i8 escape
53: optional bool memtable_on_sink_node;
54: optional bool group_commit
+ 55: optional i32 stream_per_node;
}
struct TStreamLoadPutResult {
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index cf38f51c054..e55ab89e328 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -455,6 +455,12 @@ struct TExecPlanFragmentParams {
24: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams>
file_scan_params
25: optional i64 wal_id
+
+ // num load stream for each sink backend
+ 26: optional i32 load_stream_per_node
+
+ // total num of load streams the downstream backend will see
+ 27: optional i32 total_load_streams
}
struct TExecPlanFragmentParamsList {
@@ -670,6 +676,8 @@ struct TPipelineFragmentParams {
// scan node id -> scan range params, only for external file scan
29: optional map<Types.TPlanNodeId, PlanNodes.TFileScanRangeParams>
file_scan_params
30: optional bool group_commit = false;
+ 31: optional i32 load_stream_per_node // num load stream for each sink
backend
+ 32: optional i32 total_load_streams // total num of load streams the
downstream backend will see
}
struct TPipelineFragmentParamsList {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]