This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 69524ccf98f [improve](group commit) Group commit support commit by
data size (#29428)
69524ccf98f is described below
commit 69524ccf98f57877440d8e09d878fb76f3e011ad
Author: meiyi <[email protected]>
AuthorDate: Tue Jan 2 23:20:23 2024 +0800
[improve](group commit) Group commit support commit by data size (#29428)
---
be/src/runtime/group_commit_mgr.cpp | 41 ++++++++------
be/src/runtime/group_commit_mgr.h | 64 +++++++++++++---------
.../apache/doris/analysis/NativeInsertStmt.java | 2 -
.../apache/doris/service/FrontendServiceImpl.java | 2 +
gensrc/thrift/FrontendService.thrift | 1 +
5 files changed, 66 insertions(+), 44 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 0c3589edbe0..2971138d5b6 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -36,11 +36,11 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state,
while (!runtime_state->is_cancelled() && status.ok() &&
_all_block_queues_bytes->load(std::memory_order_relaxed) >
config::group_commit_queue_mem_limit) {
- _put_cond.wait_for(
- l,
std::chrono::milliseconds(LoadBlockQueue::MAX_BLOCK_QUEUE_ADD_WAIT_TIME));
+ _put_cond.wait_for(l,
+
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
- if (duration.count() > LoadBlockQueue::WAL_MEM_BACK_PRESSURE_TIME_OUT)
{
+ if (duration.count() > LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIMEOUT)
{
return Status::TimedOut(
"Wal memory back pressure wait too much time! Load block
queue txn id: {}, "
"label: {}, instance id: {}",
@@ -60,8 +60,14 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
return st;
}
}
+ _data_bytes += block->bytes();
_all_block_queues_bytes->fetch_add(block->bytes(),
std::memory_order_relaxed);
}
+ if (_data_bytes >= _group_commit_data_bytes) {
+ VLOG_DEBUG << "group commit meets commit condition for data size,
label=" << label
+ << ", instance_id=" << load_instance_id << ", data_bytes="
<< _data_bytes;
+ _need_commit = true;
+ }
_get_cond.notify_all();
return Status::OK();
}
@@ -71,25 +77,25 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
- if (!need_commit) {
+ if (!_need_commit) {
auto left_milliseconds =
_group_commit_interval_ms -
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
if (left_milliseconds <= 0) {
- need_commit = true;
+ _need_commit = true;
}
}
while (!runtime_state->is_cancelled() && status.ok() &&
_block_queue.empty() &&
- (!need_commit || (need_commit && !_load_ids.empty()))) {
+ (!_need_commit || (_need_commit && !_load_ids.empty()))) {
auto left_milliseconds = _group_commit_interval_ms;
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
- if (!need_commit) {
+ if (!_need_commit) {
left_milliseconds = _group_commit_interval_ms - duration;
if (left_milliseconds <= 0) {
- need_commit = true;
+ _need_commit = true;
break;
}
} else {
@@ -120,7 +126,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
_block_queue.pop_front();
_all_block_queues_bytes->fetch_sub(block->bytes(),
std::memory_order_relaxed);
}
- if (_block_queue.empty() && need_commit && _load_ids.empty()) {
+ if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
*eos = true;
} else {
*eos = false;
@@ -139,7 +145,7 @@ void LoadBlockQueue::remove_load_id(const UniqueId&
load_id) {
Status LoadBlockQueue::add_load_id(const UniqueId& load_id) {
std::unique_lock l(mutex);
- if (need_commit) {
+ if (_need_commit) {
return Status::InternalError("block queue is set need commit, id=" +
load_instance_id.to_string());
}
@@ -175,7 +181,7 @@ Status GroupCommitTable::get_first_block_load_queue(
for (int i = 0; i < 3; i++) {
bool is_schema_version_match = true;
for (auto it = _load_block_queues.begin(); it !=
_load_block_queues.end(); ++it) {
- if (!it->second->need_commit) {
+ if (!it->second->need_commit()) {
if (base_schema_version == it->second->schema_version) {
if (it->second->add_load_id(load_id).ok()) {
load_block_queue = it->second;
@@ -282,7 +288,8 @@ Status GroupCommitTable::_create_group_commit_load(
{
load_block_queue = std::make_shared<LoadBlockQueue>(
instance_id, label, txn_id, schema_version,
_all_block_queues_bytes,
- result.wait_internal_group_commit_finish,
result.group_commit_interval_ms);
+ result.wait_internal_group_commit_finish,
result.group_commit_interval_ms,
+ result.group_commit_data_bytes);
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
@@ -377,7 +384,7 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
if (status.ok() && st.ok() &&
(result_status.ok() ||
result_status.is<ErrorCode::PUBLISH_TIMEOUT>())) {
RETURN_IF_ERROR(_exec_env->wal_mgr()->delete_wal(
- txn_id, load_block_queue->block_queue_pre_allocated.load()));
+ txn_id, load_block_queue->block_queue_pre_allocated()));
RETURN_IF_ERROR(_exec_env->wal_mgr()->erase_wal_status_queue(table_id,
txn_id));
} else {
std::string wal_path;
@@ -485,7 +492,7 @@ Status LoadBlockQueue::create_wal(int64_t db_id, int64_t
tb_id, int64_t wal_id,
const std::string& import_label, WalManager*
wal_manager,
std::vector<TSlotDescriptor>& slot_desc, int
be_exe_version) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->wal_mgr()->add_wal_path(db_id,
tb_id, wal_id,
-
import_label, wal_base_path));
+
import_label, _wal_base_path));
_v_wal_writer = std::make_shared<vectorized::VWalWriter>(
tb_id, wal_id, import_label, wal_manager, slot_desc,
be_exe_version);
return _v_wal_writer->init();
@@ -502,17 +509,17 @@ bool LoadBlockQueue::has_enough_wal_disk_space(size_t
pre_allocated) {
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
size_t available_bytes = 0;
{
- Status st = wal_mgr->get_wal_dir_available_size(wal_base_path,
&available_bytes);
+ Status st = wal_mgr->get_wal_dir_available_size(_wal_base_path,
&available_bytes);
if (!st.ok()) {
LOG(WARNING) << "get wal disk available size filed!";
}
}
if (pre_allocated < available_bytes) {
- Status st = wal_mgr->update_wal_dir_pre_allocated(wal_base_path,
pre_allocated, true);
+ Status st = wal_mgr->update_wal_dir_pre_allocated(_wal_base_path,
pre_allocated, true);
if (!st.ok()) {
LOG(WARNING) << "update wal dir pre_allocated failed, reason: " <<
st.to_string();
}
- block_queue_pre_allocated.fetch_add(pre_allocated);
+ _block_queue_pre_allocated.fetch_add(pre_allocated);
return true;
} else {
return false;
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 125256535fe..49bdd5f87a7 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -45,15 +45,17 @@ public:
LoadBlockQueue(const UniqueId& load_instance_id, std::string& label,
int64_t txn_id,
int64_t schema_version,
std::shared_ptr<std::atomic_size_t> all_block_queues_bytes,
- bool wait_internal_group_commit_finish, int64_t
group_commit_interval_ms)
+ bool wait_internal_group_commit_finish, int64_t
group_commit_interval_ms,
+ int64_t group_commit_data_bytes)
: load_instance_id(load_instance_id),
label(label),
txn_id(txn_id),
schema_version(schema_version),
wait_internal_group_commit_finish(wait_internal_group_commit_finish),
+ _group_commit_interval_ms(group_commit_interval_ms),
_start_time(std::chrono::steady_clock::now()),
- _all_block_queues_bytes(all_block_queues_bytes),
- _group_commit_interval_ms(group_commit_interval_ms) {};
+ _group_commit_data_bytes(group_commit_data_bytes),
+ _all_block_queues_bytes(all_block_queues_bytes) {};
Status add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block,
bool write_wal);
@@ -62,44 +64,54 @@ public:
Status add_load_id(const UniqueId& load_id);
void remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
+ bool need_commit() { return _need_commit; }
+
Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const
std::string& import_label,
WalManager* wal_manager, std::vector<TSlotDescriptor>&
slot_desc,
int be_exe_version);
Status close_wal();
bool has_enough_wal_disk_space(size_t pre_allocated);
+ size_t block_queue_pre_allocated() { return
_block_queue_pre_allocated.load(); }
- // 1s
- static constexpr size_t MAX_BLOCK_QUEUE_ADD_WAIT_TIME = 1000;
- // 120s
- static constexpr size_t WAL_MEM_BACK_PRESSURE_TIME_OUT = 120000;
UniqueId load_instance_id;
std::string label;
int64_t txn_id;
int64_t schema_version;
- bool need_commit = false;
bool wait_internal_group_commit_finish = false;
+
+ // the execute status of this internal group commit
std::mutex mutex;
- bool process_finish = false;
std::condition_variable internal_group_commit_finish_cv;
+ bool process_finish = false;
Status status = Status::OK();
- std::string wal_base_path;
- std::atomic_size_t block_queue_pre_allocated = 0;
private:
void _cancel_without_lock(const Status& st);
- std::chrono::steady_clock::time_point _start_time;
- std::condition_variable _put_cond;
- std::condition_variable _get_cond;
// the set of load ids of all blocks in this queue
std::set<UniqueId> _load_ids;
std::list<std::shared_ptr<vectorized::Block>> _block_queue;
- // memory consumption of all tables' load block queues, used for back
pressure.
- std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
- // group commit interval in ms, can be changed by 'ALTER TABLE my_table
SET ("group_commit_interval_ms"="1000");'
- int64_t _group_commit_interval_ms;
+ // wal
+ std::string _wal_base_path;
std::shared_ptr<vectorized::VWalWriter> _v_wal_writer;
+ std::atomic_size_t _block_queue_pre_allocated = 0;
+
+ // commit
+ bool _need_commit = false;
+ // commit by time interval, can be changed by 'ALTER TABLE my_table SET
("group_commit_interval_ms"="1000");'
+ int64_t _group_commit_interval_ms;
+ std::chrono::steady_clock::time_point _start_time;
+ // commit by data size
+ int64_t _group_commit_data_bytes;
+ int64_t _data_bytes = 0;
+
+ // memory back pressure, memory consumption of all tables' load block
queues
+ std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
+ std::condition_variable _put_cond;
+ std::condition_variable _get_cond;
+ static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIME = 1000; // 1s
+ static constexpr size_t MEM_BACK_PRESSURE_WAIT_TIMEOUT = 120000; // 120s
};
class GroupCommitTable {
@@ -108,9 +120,9 @@ public:
int64_t table_id, std::shared_ptr<std::atomic_size_t>
all_block_queue_bytes)
: _exec_env(exec_env),
_thread_pool(thread_pool),
+ _all_block_queues_bytes(all_block_queue_bytes),
_db_id(db_id),
- _table_id(table_id),
- _all_block_queues_bytes(all_block_queue_bytes) {};
+ _table_id(table_id) {};
Status get_first_block_load_queue(int64_t table_id, int64_t
base_schema_version,
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
@@ -131,15 +143,17 @@ private:
ExecEnv* _exec_env = nullptr;
ThreadPool* _thread_pool = nullptr;
+ // memory consumption of all tables' load block queues, used for memory
back pressure.
+ std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
+
int64_t _db_id;
int64_t _table_id;
+
std::mutex _lock;
std::condition_variable _cv;
// fragment_instance_id to load_block_queue
std::unordered_map<UniqueId, std::shared_ptr<LoadBlockQueue>>
_load_block_queues;
bool _need_plan_fragment = false;
- // memory consumption of all tables' load block queues, used for back
pressure.
- std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
};
class GroupCommitMgr {
@@ -159,13 +173,13 @@ public:
private:
ExecEnv* _exec_env = nullptr;
+ std::unique_ptr<doris::ThreadPool> _thread_pool;
+ // memory consumption of all tables' load block queues, used for memory
back pressure.
+ std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
std::mutex _lock;
// TODO remove table when unused
std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map;
- std::unique_ptr<doris::ThreadPool> _thread_pool;
- // memory consumption of all tables' load block queues, used for back
pressure.
- std::shared_ptr<std::atomic_size_t> _all_block_queues_bytes;
};
} // namespace doris
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 576ebba9b0e..176833b865c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -69,7 +69,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -150,7 +149,6 @@ public class NativeInsertStmt extends InsertStmt {
private boolean isGroupCommit = false;
private int baseSchemaVersion = -1;
private TUniqueId loadId = null;
- private ByteString execPlanFragmentParamsBytes = null;
private long tableId = -1;
public boolean isGroupCommitStreamLoadSql = false;
private GroupCommitPlanner groupCommitPlanner;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 136084f5f1c..cf42cb921f6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1983,6 +1983,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
result.setTableId(parsedStmt.getTargetTable().getId());
result.setBaseSchemaVersion(((OlapTable)
parsedStmt.getTargetTable()).getBaseSchemaVersion());
result.setGroupCommitIntervalMs(((OlapTable)
parsedStmt.getTargetTable()).getGroupCommitIntervalMs());
+ // TODO get from table property
+ result.setGroupCommitDataBytes(134217728L);
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
} catch (UserException e) {
LOG.warn("exec sql error", e);
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 209947c8b78..75e67722113 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -670,6 +670,7 @@ struct TStreamLoadPutResult {
6: optional i64 table_id
7: optional bool wait_internal_group_commit_finish = false
8: optional i64 group_commit_interval_ms
+ 9: optional i64 group_commit_data_bytes
}
struct TStreamLoadMultiTablePutResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]