This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 51ccd44 [Load Parallel][3/3] Support parallel delta writer (#5369)
51ccd44 is described below
commit 51ccd44865662be352bb9f7bd9194d42564be06f
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Feb 7 22:42:18 2021 +0800
[Load Parallel][3/3] Support parallel delta writer (#5369)
In the previous broker load, multiple OlapTableSinks would send data to the
same LoadChannel,
and because of the lock granularity problem, LoadChannel could only process
these requests serially,
which made it impossible to make full use of cluster resources.
This CL modifies the related locks so that LoadChannel can process these
requests in parallel.
In the test, with a size of 20G, the load speed of 334 million rows of data
in 3 nodes has been
increased from 9min to 5min, and after enabling 2 concurrency, it can be
increased to 3min.
Also modify the profile of load job.
---
be/src/exec/tablet_sink.cpp | 37 ++++++++---
be/src/exec/tablet_sink.h | 26 ++++++--
be/src/olap/delta_writer.cpp | 60 ++++++++++++++++--
be/src/olap/delta_writer.h | 11 +++-
be/src/olap/olap_define.h | 2 +
be/src/olap/rowset/unique_rowset_id_generator.cpp | 1 +
be/src/olap/tablet_manager.cpp | 2 +
be/src/runtime/load_channel_mgr.cpp | 3 +-
be/src/runtime/load_channel_mgr.h | 3 +-
be/src/runtime/tablets_channel.cpp | 74 +++++++++++-----------
be/src/service/internal_service.cpp | 4 +-
be/test/runtime/load_channel_mgr_test.cpp | 20 +++---
.../main/java/org/apache/doris/qe/Coordinator.java | 4 +-
13 files changed, 173 insertions(+), 74 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index a8f84b5..949e099 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -562,12 +562,18 @@ Status OlapTableSink::prepare(RuntimeState* state) {
_output_rows_counter = ADD_COUNTER(_profile, "RowsReturned", TUnit::UNIT);
_filtered_rows_counter = ADD_COUNTER(_profile, "RowsFiltered",
TUnit::UNIT);
_send_data_timer = ADD_TIMER(_profile, "SendDataTime");
+ _wait_mem_limit_timer = ADD_CHILD_TIMER(_profile, "WaitMemLimitTime",
"SendDataTime");
_convert_batch_timer = ADD_TIMER(_profile, "ConvertBatchTime");
_validate_data_timer = ADD_TIMER(_profile, "ValidateDataTime");
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseWaitTime");
_non_blocking_send_timer = ADD_TIMER(_profile, "NonBlockingSendTime");
- _serialize_batch_timer = ADD_TIMER(_profile, "SerializeBatchTime");
+ _non_blocking_send_work_timer = ADD_CHILD_TIMER(_profile,
"NonBlockingSendWorkTime", "NonBlockingSendTime");
+ _serialize_batch_timer = ADD_CHILD_TIMER(_profile, "SerializeBatchTime",
"NonBlockingSendWorkTime");
+ _total_add_batch_exec_timer = ADD_TIMER(_profile, "TotalAddBatchExecTime");
+ _max_add_batch_exec_timer = ADD_TIMER(_profile, "MaxAddBatchExecTime");
+ _add_batch_number = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT);
+ _num_node_channels = ADD_COUNTER(_profile, "NumberNodeChannels",
TUnit::UNIT);
_load_mem_limit = state->get_load_mem_limit();
// open all channels
@@ -697,18 +703,23 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
// BE id -> add_batch method counter
std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
int64_t serialize_batch_ns = 0, mem_exceeded_block_ns = 0,
queue_push_lock_ns = 0,
- actual_consume_ns = 0;
+ actual_consume_ns = 0, total_add_batch_exec_time_ns = 0,
+ max_add_batch_exec_time_ns = 0,
+ total_add_batch_num = 0, num_node_channels = 0;
{
SCOPED_TIMER(_close_timer);
for (auto index_channel : _channels) {
index_channel->for_each_node_channel([](NodeChannel* ch) {
ch->mark_close(); });
+ num_node_channels += index_channel->num_node_channels();
}
for (auto index_channel : _channels) {
+ int64_t add_batch_exec_time = 0;
index_channel->for_each_node_channel([&status, &state,
&node_add_batch_counter_map,
&serialize_batch_ns,
&mem_exceeded_block_ns,
- &queue_push_lock_ns,
-
&actual_consume_ns](NodeChannel* ch) {
+ &queue_push_lock_ns,
&actual_consume_ns,
+
&total_add_batch_exec_time_ns, &add_batch_exec_time,
+
&total_add_batch_num](NodeChannel* ch) {
auto s = ch->close_wait(state);
if (!s.ok()) {
// 'status' will store the last non-ok status of all
channels
@@ -719,8 +730,13 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
}
ch->time_report(&node_add_batch_counter_map,
&serialize_batch_ns,
&mem_exceeded_block_ns,
&queue_push_lock_ns,
- &actual_consume_ns);
+ &actual_consume_ns,
&total_add_batch_exec_time_ns,
+ &add_batch_exec_time,
&total_add_batch_num);
});
+
+ if (add_batch_exec_time > max_add_batch_exec_time_ns) {
+ max_add_batch_exec_time_ns = add_batch_exec_time;
+ }
}
}
// TODO need to be improved
@@ -732,9 +748,15 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
COUNTER_SET(_output_rows_counter, _number_output_rows);
COUNTER_SET(_filtered_rows_counter, _number_filtered_rows);
COUNTER_SET(_send_data_timer, _send_data_ns);
+ COUNTER_SET(_wait_mem_limit_timer, mem_exceeded_block_ns);
COUNTER_SET(_convert_batch_timer, _convert_batch_ns);
COUNTER_SET(_validate_data_timer, _validate_data_ns);
COUNTER_SET(_serialize_batch_timer, serialize_batch_ns);
+ COUNTER_SET(_non_blocking_send_work_timer, actual_consume_ns);
+ COUNTER_SET(_total_add_batch_exec_timer, total_add_batch_exec_time_ns);
+ COUNTER_SET(_max_add_batch_exec_timer, max_add_batch_exec_time_ns);
+ COUNTER_SET(_add_batch_number, total_add_batch_num);
+ COUNTER_SET(_num_node_channels, num_node_channels);
// _number_input_rows don't contain num_rows_load_filtered and
num_rows_load_unselected in scan node
int64_t num_rows_load_total = _number_input_rows +
state->num_rows_load_filtered() +
state->num_rows_load_unselected();
@@ -744,11 +766,10 @@ Status OlapTableSink::close(RuntimeState* state, Status
close_status) {
// print log of add batch time of all node, for tracing load
performance easily
std::stringstream ss;
ss << "finished to close olap table sink. load_id=" <<
print_id(_load_id)
- << ", txn_id=" << _txn_id << ", node add batch time(ms)/wait lock
time(ms)/num: ";
+ << ", txn_id=" << _txn_id << ", node add batch time(ms)/num: ";
for (auto const& pair : node_add_batch_counter_map) {
ss << "{" << pair.first << ":(" <<
(pair.second.add_batch_execution_time_us / 1000)
- << ")(" << (pair.second.add_batch_wait_lock_time_us / 1000) <<
")("
- << pair.second.add_batch_num << ")} ";
+ << ")(" << pair.second.add_batch_num << ")} ";
}
LOG(INFO) << ss.str();
} else {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 8d894c6..4a42202 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -80,7 +80,8 @@ struct AddBatchCounter {
template <typename T>
class ReusableClosure : public google::protobuf::Closure {
public:
- ReusableClosure() : cid(INVALID_BTHREAD_ID) {}
+ ReusableClosure() : cid(INVALID_BTHREAD_ID) {
+ }
~ReusableClosure() {
// shouldn't delete when Run() is calling or going to be called, wait
for current Run() done.
join();
@@ -173,12 +174,17 @@ public:
void time_report(std::unordered_map<int64_t, AddBatchCounter>*
add_batch_counter_map,
int64_t* serialize_batch_ns, int64_t*
mem_exceeded_block_ns,
- int64_t* queue_push_lock_ns, int64_t* actual_consume_ns) {
+ int64_t* queue_push_lock_ns, int64_t* actual_consume_ns,
+ int64_t* total_add_batch_exec_time_ns, int64_t*
add_batch_exec_time_ns,
+ int64_t* total_add_batch_num) {
(*add_batch_counter_map)[_node_id] += _add_batch_counter;
*serialize_batch_ns += _serialize_batch_ns;
*mem_exceeded_block_ns += _mem_exceeded_block_ns;
*queue_push_lock_ns += _queue_push_lock_ns;
*actual_consume_ns += _actual_consume_ns;
+ *add_batch_exec_time_ns =
(_add_batch_counter.add_batch_execution_time_us * 1000);
+ *total_add_batch_exec_time_ns += *add_batch_exec_time_ns;
+ *total_add_batch_num += _add_batch_counter.add_batch_num;
}
int64_t node_id() const { return _node_id; }
@@ -237,10 +243,10 @@ private:
std::vector<TTabletCommitInfo> _tablet_commit_infos;
AddBatchCounter _add_batch_counter;
- std::atomic<int64_t> _serialize_batch_ns;
- std::atomic<int64_t> _mem_exceeded_block_ns;
- std::atomic<int64_t> _queue_push_lock_ns;
- std::atomic<int64_t> _actual_consume_ns;
+ std::atomic<int64_t> _serialize_batch_ns{0};
+ std::atomic<int64_t> _mem_exceeded_block_ns{0};
+ std::atomic<int64_t> _queue_push_lock_ns{0};
+ std::atomic<int64_t> _actual_consume_ns{0};
};
class IndexChannel {
@@ -262,6 +268,8 @@ public:
void mark_as_failed(const NodeChannel* ch) {
_failed_channels.insert(ch->node_id()); }
bool has_intolerable_failure();
+ size_t num_node_channels() const { return _node_channels.size(); }
+
private:
OlapTableSink* _parent;
int64_t _index_id;
@@ -382,12 +390,18 @@ private:
RuntimeProfile::Counter* _output_rows_counter = nullptr;
RuntimeProfile::Counter* _filtered_rows_counter = nullptr;
RuntimeProfile::Counter* _send_data_timer = nullptr;
+ RuntimeProfile::Counter* _wait_mem_limit_timer = nullptr;
RuntimeProfile::Counter* _convert_batch_timer = nullptr;
RuntimeProfile::Counter* _validate_data_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _non_blocking_send_timer = nullptr;
+ RuntimeProfile::Counter* _non_blocking_send_work_timer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
+ RuntimeProfile::Counter* _total_add_batch_exec_timer = nullptr;
+ RuntimeProfile::Counter* _max_add_batch_exec_timer = nullptr;
+ RuntimeProfile::Counter* _add_batch_number = nullptr;
+ RuntimeProfile::Counter* _num_node_channels = nullptr;
// load mem limit is for remote load channel
int64_t _load_mem_limit = -1;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 1ed44fb..ee80db1 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -176,10 +176,17 @@ OLAPStatus DeltaWriter::init() {
}
OLAPStatus DeltaWriter::write(Tuple* tuple) {
- if (!_is_init) {
+ std::lock_guard<SpinLock> l(_lock);
+ if (!_is_init && !_is_cancelled) {
RETURN_NOT_OK(init());
}
+ if (_is_cancelled) {
+ // The writer may be cancelled at any time by other thread.
+ // just return ERROR if writer is cancelled.
+ return OLAP_ERR_ALREADY_CANCELLED;
+ }
+
_mem_table->insert(tuple);
// if memtable is full, push it to the flush executor,
@@ -196,7 +203,20 @@ OLAPStatus DeltaWriter::_flush_memtable_async() {
return _flush_token->submit(_mem_table);
}
-OLAPStatus DeltaWriter::flush_memtable_and_wait() {
+OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
+ std::lock_guard<SpinLock> l(_lock);
+ if (!_is_init) {
+ // This writer is not initialized before flushing. Do nothing
+ // But we return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED,
+ // Because this method maybe called when trying to reduce mem
consumption,
+ // and at that time, the writer may not be initialized yet and that is
a normal case.
+ return OLAP_SUCCESS;
+ }
+
+ if (_is_cancelled) {
+ return OLAP_ERR_ALREADY_CANCELLED;
+ }
+
if (mem_consumption() == _mem_table->memory_usage()) {
// equal means there is no memtable in flush queue, just flush this
memtable
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable
size: "
@@ -208,7 +228,24 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
DCHECK(mem_consumption() > _mem_table->memory_usage());
// this means there should be at least one memtable in flush queue.
}
- // wait all memtables in flush queue to be flushed.
+
+ if (need_wait) {
+ // wait all memtables in flush queue to be flushed.
+ RETURN_NOT_OK(_flush_token->wait());
+ }
+ return OLAP_SUCCESS;
+}
+
+OLAPStatus DeltaWriter::wait_flush() {
+ std::lock_guard<SpinLock> l(_lock);
+ if (!_is_init) {
+ // return OLAP_SUCCESS instead of OLAP_ERR_ALREADY_CANCELLED for same
reason
+ // as described in flush_memtable_and_wait()
+ return OLAP_SUCCESS;
+ }
+ if (_is_cancelled) {
+ return OLAP_ERR_ALREADY_CANCELLED;
+ }
RETURN_NOT_OK(_flush_token->wait());
return OLAP_SUCCESS;
}
@@ -220,7 +257,8 @@ void DeltaWriter::_reset_mem_table() {
}
OLAPStatus DeltaWriter::close() {
- if (!_is_init) {
+ std::lock_guard<SpinLock> l(_lock);
+ if (!_is_init && !_is_cancelled) {
// if this delta writer is not initialized, but close() is called.
// which means this tablet has no data loaded, but at least one tablet
// in same partition has data loaded.
@@ -229,14 +267,24 @@ OLAPStatus DeltaWriter::close() {
RETURN_NOT_OK(init());
}
+ if (_is_cancelled) {
+ return OLAP_ERR_ALREADY_CANCELLED;
+ }
+
RETURN_NOT_OK(_flush_memtable_async());
_mem_table.reset();
return OLAP_SUCCESS;
}
OLAPStatus
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec) {
+ std::lock_guard<SpinLock> l(_lock);
DCHECK(_is_init)
<< "delta writer is supposed be to initialized before close_wait()
being called";
+
+ if (_is_cancelled) {
+ return OLAP_ERR_ALREADY_CANCELLED;
+ }
+
// return error if previous flush failed
RETURN_NOT_OK(_flush_token->wait());
DCHECK_EQ(_mem_tracker->consumption(), 0);
@@ -295,7 +343,8 @@ OLAPStatus
DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
}
OLAPStatus DeltaWriter::cancel() {
- if (!_is_init) {
+ std::lock_guard<SpinLock> l(_lock);
+ if (!_is_init || _is_cancelled) {
return OLAP_SUCCESS;
}
_mem_table.reset();
@@ -304,6 +353,7 @@ OLAPStatus DeltaWriter::cancel() {
_flush_token->cancel();
}
DCHECK_EQ(_mem_tracker->consumption(), 0);
+ _is_cancelled = true;
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index caa0483..9cf59eb 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -21,6 +21,7 @@
#include "gen_cpp/internal_service.pb.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/tablet.h"
+#include "util/spinlock.h"
namespace doris {
@@ -73,12 +74,17 @@ public:
// submit current memtable to flush queue, and wait all memtables in flush
queue
// to be flushed.
// This is currently for reducing mem consumption of this delta writer.
- OLAPStatus flush_memtable_and_wait();
+ // If need_wait is true, it will wait for all memtable in flush queue to
be flushed.
+ // Otherwise, it will just put memtables to the flush queue and return.
+ OLAPStatus flush_memtable_and_wait(bool need_wait);
int64_t partition_id() const;
int64_t mem_consumption() const;
+ // Wait all memtable in flush queue to be flushed
+ OLAPStatus wait_flush();
+
private:
DeltaWriter(WriteRequest* req, const std::shared_ptr<MemTracker>& parent,
StorageEngine* storage_engine);
@@ -92,6 +98,7 @@ private:
private:
bool _is_init = false;
+ bool _is_cancelled = false;
WriteRequest _req;
TabletSharedPtr _tablet;
RowsetSharedPtr _cur_rowset;
@@ -106,6 +113,8 @@ private:
StorageEngine* _storage_engine;
std::unique_ptr<FlushToken> _flush_token;
std::shared_ptr<MemTracker> _mem_tracker;
+
+ SpinLock _lock;
};
} // namespace doris
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 75ad835..ef1ec2e 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -165,6 +165,8 @@ enum OLAPStatus {
OLAP_ERR_TOO_MANY_TRANSACTIONS = -233,
OLAP_ERR_INVALID_SNAPSHOT_VERSION = -234,
OLAP_ERR_TOO_MANY_VERSION = -235,
+ OLAP_ERR_NOT_INITIALIZED = -236,
+ OLAP_ERR_ALREADY_CANCELLED = -237,
// CommandExecutor
// [-300, -400)
diff --git a/be/src/olap/rowset/unique_rowset_id_generator.cpp
b/be/src/olap/rowset/unique_rowset_id_generator.cpp
index 71352ca..c21b8ca 100644
--- a/be/src/olap/rowset/unique_rowset_id_generator.cpp
+++ b/be/src/olap/rowset/unique_rowset_id_generator.cpp
@@ -19,6 +19,7 @@
#include "util/doris_metrics.h"
#include "util/spinlock.h"
+#include "util/stack_util.h"
#include "util/uid_util.h"
namespace doris {
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 78baf0f..9f19cf4 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -632,6 +632,7 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId
tablet_id, SchemaHash schema
bool TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path,
TTabletId*
tablet_id,
TSchemaHash*
schema_hash) {
+ // the path like: /data/14/10080/964828783/
static re2::RE2 normal_re("/data/\\d+/(\\d+)/(\\d+)($|/)");
// match tablet schema hash data path, for example, the path is
/data/1/16791/29998
// 1 is shard id , 16791 is tablet id, 29998 is schema hash
@@ -651,6 +652,7 @@ bool
TabletManager::get_tablet_id_and_schema_hash_from_path(const string& path,
}
bool TabletManager::get_rowset_id_from_path(const string& path, RowsetId*
rowset_id) {
+ // the path like:
/data/14/10080/964828783/02000000000000969144d8725cb62765f9af6cd3125d5a91_0.dat
static re2::RE2 re("/data/\\d+/\\d+/\\d+/([A-Fa-f0-9]+)_.*");
string id_str;
bool ret = RE2::PartialMatch(path, re, &id_str);
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index c2feebb..2f320d0 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -117,8 +117,7 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest&
params) {
static void dummy_deleter(const CacheKey& key, void* value) {}
Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
-
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec,
- int64_t* wait_lock_time_ns) {
+
google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
UniqueId load_id(request.id());
// 1. get load channel
std::shared_ptr<LoadChannel> channel;
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index f0ec6fa..450c8bf 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -51,8 +51,7 @@ public:
Status open(const PTabletWriterOpenRequest& request);
Status add_batch(const PTabletWriterAddBatchRequest& request,
- google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec,
- int64_t* wait_lock_time_ns);
+ google::protobuf::RepeatedPtrField<PTabletInfo>*
tablet_vec);
// cancel all tablet stream for 'load_id' load
Status cancel(const PTabletWriterCancelRequest& request);
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 76c7166..b2a32c0 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -77,23 +77,26 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest&
params) {
Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
DCHECK(params.tablet_ids_size() == params.row_batch().num_rows());
- std::lock_guard<std::mutex> l(_lock);
- if (_state != kOpened) {
- return _state == kFinished
- ? _close_status
- :
Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1",
-
_key.to_string(), _state));
- }
- auto next_seq = _next_seqs[params.sender_id()];
- // check packet
- if (params.packet_seq() < next_seq) {
- LOG(INFO) << "packet has already recept before, expect_seq=" <<
next_seq
- << ", recept_seq=" << params.packet_seq();
- return Status::OK();
- } else if (params.packet_seq() > next_seq) {
- LOG(WARNING) << "lost data packet, expect_seq=" << next_seq
- << ", recept_seq=" << params.packet_seq();
- return Status::InternalError("lost data packet");
+ int64_t cur_seq;
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ if (_state != kOpened) {
+ return _state == kFinished
+ ? _close_status
+ : Status::InternalError(strings::Substitute("TabletsChannel $0
state: $1",
+ _key.to_string(), _state));
+ }
+ cur_seq = _next_seqs[params.sender_id()];
+ // check packet
+ if (params.packet_seq() < cur_seq) {
+ LOG(INFO) << "packet has already recept before, expect_seq=" <<
cur_seq
+ << ", recept_seq=" << params.packet_seq();
+ return Status::OK();
+ } else if (params.packet_seq() > cur_seq) {
+ LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
+ << ", recept_seq=" << params.packet_seq();
+ return Status::InternalError("lost data packet");
+ }
}
RowBatch row_batch(*_row_desc, params.row_batch(), _mem_tracker.get());
@@ -115,7 +118,11 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBatchRequest& params) {
return Status::InternalError(err_msg);
}
}
- _next_seqs[params.sender_id()]++;
+
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ _next_seqs[params.sender_id()] = cur_seq + 1;
+ }
return Status::OK();
}
@@ -183,29 +190,20 @@ Status TabletsChannel::reduce_mem_usage() {
// therefore it's possible for reduce_mem_usage() to be called right
after close()
return _close_status;
}
- // find tablet writer with largest mem consumption
- int64_t max_consume = 0L;
- DeltaWriter* writer = nullptr;
- for (auto& it : _tablet_writers) {
- if (it.second->mem_consumption() > max_consume) {
- max_consume = it.second->mem_consumption();
- writer = it.second;
- }
- }
- if (writer == nullptr || max_consume == 0) {
- // barely not happend, just return OK
- return Status::OK();
+ // Flush all memtables
+ for (auto& it : _tablet_writers) {
+ it.second->flush_memtable_and_wait(false);
}
- VLOG_NOTICE << "pick the delte writer to flush, with mem consumption: " <<
max_consume
- << ", channel key: " << _key;
- OLAPStatus st = writer->flush_memtable_and_wait();
- if (st != OLAP_SUCCESS) {
- // flush failed, return error
- std::stringstream ss;
- ss << "failed to reduce mem consumption by flushing memtable. err: "
<< st;
- return Status::InternalError(ss.str());
+ for (auto& it : _tablet_writers) {
+ OLAPStatus st = it.second->wait_flush();
+ if (st != OLAP_SUCCESS) {
+ // flush failed, return error
+ std::stringstream ss;
+ ss << "failed to reduce mem consumption by flushing memtable. err:
" << st;
+ return Status::InternalError(ss.str());
+ }
}
return Status::OK();
}
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 7c85654..51480fb 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -97,11 +97,10 @@ void
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
_tablet_worker_pool.offer([request, response, done, this]() {
brpc::ClosureGuard closure_guard(done);
int64_t execution_time_ns = 0;
- int64_t wait_lock_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
auto st = _exec_env->load_channel_mgr()->add_batch(
- *request, response->mutable_tablet_vec(),
&wait_lock_time_ns);
+ *request, response->mutable_tablet_vec());
if (!st.ok()) {
LOG(WARNING) << "tablet writer add batch failed, message=" <<
st.get_error_msg()
<< ", id=" << request->id() << ", index_id=" <<
request->index_id()
@@ -110,7 +109,6 @@ void
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
st.to_protobuf(response->mutable_status());
}
response->set_execution_time_us(execution_time_ns / 1000);
- response->set_wait_lock_time_us(wait_lock_time_ns / 1000);
});
}
diff --git a/be/test/runtime/load_channel_mgr_test.cpp
b/be/test/runtime/load_channel_mgr_test.cpp
index 54b0d3f..80e03ad 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -85,7 +85,11 @@ OLAPStatus DeltaWriter::cancel() {
return OLAP_SUCCESS;
}
-OLAPStatus DeltaWriter::flush_memtable_and_wait() {
+OLAPStatus DeltaWriter::flush_memtable_and_wait(bool need_wait) {
+ return OLAP_SUCCESS;
+}
+
+OLAPStatus DeltaWriter::wait_flush() {
return OLAP_SUCCESS;
}
@@ -246,7 +250,7 @@ TEST_F(LoadChannelMgrTest, normal) {
}
row_batch.serialize(request.mutable_row_batch());
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
+ auto st = mgr.add_batch(request, &tablet_vec);
request.release_id();
ASSERT_TRUE(st.ok());
}
@@ -413,7 +417,7 @@ TEST_F(LoadChannelMgrTest, add_failed) {
row_batch.serialize(request.mutable_row_batch());
add_status = OLAP_ERR_TABLE_NOT_FOUND;
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
+ auto st = mgr.add_batch(request, &tablet_vec);
request.release_id();
ASSERT_FALSE(st.ok());
}
@@ -503,7 +507,7 @@ TEST_F(LoadChannelMgrTest, close_failed) {
row_batch.serialize(request.mutable_row_batch());
close_status = OLAP_ERR_TABLE_NOT_FOUND;
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
+ auto st = mgr.add_batch(request, &tablet_vec);
request.release_id();
// even if delta close failed, the return status is still ok, but
tablet_vec is empty
ASSERT_TRUE(st.ok());
@@ -591,7 +595,7 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) {
}
row_batch.serialize(request.mutable_row_batch());
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
+ auto st = mgr.add_batch(request, &tablet_vec);
request.release_id();
ASSERT_FALSE(st.ok());
}
@@ -677,10 +681,10 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
}
row_batch.serialize(request.mutable_row_batch());
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec1;
- auto st = mgr.add_batch(request, &tablet_vec1, &wait_lock_time_ns);
+ auto st = mgr.add_batch(request, &tablet_vec1);
ASSERT_TRUE(st.ok());
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec2;
- st = mgr.add_batch(request, &tablet_vec2, &wait_lock_time_ns);
+ st = mgr.add_batch(request, &tablet_vec2);
request.release_id();
ASSERT_TRUE(st.ok());
}
@@ -693,7 +697,7 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
request.set_eos(true);
request.set_packet_seq(0);
google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
- auto st = mgr.add_batch(request, &tablet_vec, &wait_lock_time_ns);
+ auto st = mgr.add_batch(request, &tablet_vec);
request.release_id();
ASSERT_TRUE(st.ok());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index c0fba82..64295d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1069,6 +1069,8 @@ public class Coordinator {
List<List<TScanRangeParams>> perInstanceScanRanges =
ListUtil.splitBySize(perNodeScanRanges,
expectedInstanceNum);
+ LOG.debug("scan range number per instance is: {}",
perInstanceScanRanges.size());
+
for (List<TScanRangeParams> scanRangeParams :
perInstanceScanRanges) {
FInstanceExecParam instanceParam = new
FInstanceExecParam(null, key, 0, params);
instanceParam.perNodeScanRanges.put(planNodeId,
scanRangeParams);
@@ -1085,7 +1087,7 @@ public class Coordinator {
throw new UserException("there is no scanNode Backend");
}
this.addressToBackendID.put(execHostport,
backendIdRef.getRef());
- FInstanceExecParam instanceParam = new
FInstanceExecParam(null, execHostport,
+ FInstanceExecParam instanceParam = new
FInstanceExecParam(null, execHostport,
0, params);
params.instanceExecParams.add(instanceParam);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]