This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 12dddfc26c9 [branch-2.1] Pick "[Fix](autoinc) try fix concurrent load
problem with auto inc column #36421" (#37027)
12dddfc26c9 is described below
commit 12dddfc26c9f36c1ad01051947f7e5830533a5cb
Author: bobhan1 <[email protected]>
AuthorDate: Sun Jun 30 13:10:03 2024 +0800
[branch-2.1] Pick "[Fix](autoinc) try fix concurrent load problem with auto
inc column #36421" (#37027)
## Proposed changes
pick https://github.com/apache/doris/pull/36421
---
be/src/vec/sink/autoinc_buffer.cpp | 150 +++++++++++++--------
be/src/vec/sink/autoinc_buffer.h | 35 +++--
.../doris/catalog/AutoIncrementGenerator.java | 5 +-
.../apache/doris/service/FrontendServiceImpl.java | 10 ++
gensrc/thrift/FrontendService.thrift | 1 +
.../unique/test_unique_auto_inc_concurrent.out | 10 ++
.../unique/test_unique_auto_inc_concurrent.groovy | 59 ++++++++
7 files changed, 203 insertions(+), 67 deletions(-)
diff --git a/be/src/vec/sink/autoinc_buffer.cpp
b/be/src/vec/sink/autoinc_buffer.cpp
index c7c096ec6e8..f83dbcb55b8 100644
--- a/be/src/vec/sink/autoinc_buffer.cpp
+++ b/be/src/vec/sink/autoinc_buffer.cpp
@@ -19,14 +19,15 @@
#include <gen_cpp/HeartbeatService_types.h>
-#include <string>
+#include <chrono>
+#include <mutex>
+#include "common/logging.h"
#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "util/runtime_profile.h"
#include "util/thrift_rpc_helper.h"
-#include "vec/sink/vtablet_block_convertor.h"
namespace doris::vectorized {
@@ -42,54 +43,11 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t
batch_size) {
}
}
-void AutoIncIDBuffer::_wait_for_prefetching() {
- if (_is_fetching) {
- _rpc_token->wait();
- }
-}
-
-Status AutoIncIDBuffer::sync_request_ids(size_t length,
- std::vector<std::pair<int64_t,
size_t>>* result) {
- std::unique_lock<std::mutex> lock(_mutex);
- RETURN_IF_ERROR(_prefetch_ids(_prefetch_size()));
- if (_front_buffer.second > 0) {
- auto min_length = std::min(_front_buffer.second, length);
- length -= min_length;
- result->emplace_back(_front_buffer.first, min_length);
- _front_buffer.first += min_length;
- _front_buffer.second -= min_length;
- }
- if (length > 0) {
- _wait_for_prefetching();
- if (!_rpc_status.ok()) {
- return _rpc_status;
- }
-
- {
- std::lock_guard<std::mutex> lock(_backend_buffer_latch);
- std::swap(_front_buffer, _backend_buffer);
- }
-
- DCHECK_LE(length, _front_buffer.second);
- if (length > _front_buffer.second) {
- return Status::RpcError("auto inc sync result length > front
buffer. " +
- std::to_string(length) + " vs " +
- std::to_string(_front_buffer.second));
- }
- result->emplace_back(_front_buffer.first, length);
- _front_buffer.first += length;
- _front_buffer.second -= length;
- }
- return Status::OK();
-}
-
-Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
- if (_front_buffer.second > _low_water_level_mark() || _is_fetching) {
- return Status::OK();
- }
+Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
+ constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
+ _rpc_status = Status::OK();
TNetworkAddress master_addr =
ExecEnv::GetInstance()->master_info()->network_address;
- _is_fetching = true;
- RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
+ for (uint32_t retry_times = 0; retry_times <
FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
TAutoIncrementRangeRequest request;
TAutoIncrementRangeResult result;
request.__set_db_id(_db_id);
@@ -97,7 +55,7 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
request.__set_column_id(_column_id);
request.__set_length(length);
- int64_t get_auto_inc_range_rpc_ns;
+ int64_t get_auto_inc_range_rpc_ns = 0;
{
SCOPED_RAW_TIMER(&get_auto_inc_range_rpc_ns);
_rpc_status = ThriftRpcHelper::rpc<FrontendServiceClient>(
@@ -109,15 +67,95 @@ Status AutoIncIDBuffer::_prefetch_ids(size_t length) {
LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length="
<< result.length
<< "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << "
ms]";
- if (!_rpc_status.ok() || result.length <= 0) {
- LOG(WARNING) << "Failed to fetch auto-incremnt range, encounter
rpc failure."
- << "errmsg=" << _rpc_status.to_string();
- return;
+ if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
+ LOG_WARNING(
+ "Failed to fetch auto-incremnt range, request to
non-master FE, discard all "
+ "auto_increment ranges in _buffers. retry_time={}",
+ retry_times);
+ master_addr = result.master_address;
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ continue;
}
+ if (!_rpc_status.ok()) {
+ LOG(WARNING)
+ << "Failed to fetch auto-incremnt range, encounter rpc
failure. retry_time="
+ << retry_times << ", errmsg=" << _rpc_status.to_string();
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ continue;
+ }
+ if (result.length != length) [[unlikely]] {
+ auto msg = fmt::format(
+ "Failed to fetch auto-incremnt range, request length={},
but get "
+ "result.length={}, retry_time={}",
+ length, result.length, retry_times);
+ LOG(WARNING) << msg;
+ _rpc_status = Status::RpcError<true>(msg);
+ std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ continue;
+ }
+
+ return result.start;
+ }
+ CHECK(!_rpc_status.ok());
+ return _rpc_status;
+}
+
+void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
+ size_t& request_length, std::vector<std::pair<int64_t, size_t>>*
result) {
+ std::lock_guard<std::mutex> lock {_latch};
+ while (request_length > 0 && !_buffers.empty()) {
+ auto& autoinc_range = _buffers.front();
+ CHECK_GT(autoinc_range.length, 0);
+ auto min_length = std::min(request_length, autoinc_range.length);
+ result->emplace_back(autoinc_range.start, min_length);
+ autoinc_range.consume(min_length);
+ _current_volume -= min_length;
+ request_length -= min_length;
+ if (autoinc_range.empty()) {
+ _buffers.pop_front();
+ }
+ }
+}
+
+Status AutoIncIDBuffer::sync_request_ids(size_t request_length,
+ std::vector<std::pair<int64_t,
size_t>>* result) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ while (request_length > 0) {
+ _get_autoinc_ranges_from_buffers(request_length, result);
+ if (request_length == 0) {
+ break;
+ }
+ if (!_is_fetching) {
+ RETURN_IF_ERROR(
+ _launch_async_fetch_task(std::max<size_t>(request_length,
_prefetch_size())));
+ }
+ _rpc_token->wait();
+ CHECK(!_is_fetching);
+ if (!_rpc_status.ok()) {
+ return _rpc_status;
+ }
+ }
+ CHECK_EQ(request_length, 0);
+ if (!_is_fetching && _current_volume < _low_water_level_mark()) {
+ RETURN_IF_ERROR(_launch_async_fetch_task(_prefetch_size()));
+ }
+ return Status::OK();
+}
+
+Status AutoIncIDBuffer::_launch_async_fetch_task(size_t length) {
+ _is_fetching = true;
+ RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
+ auto&& res = _fetch_ids_from_fe(length);
+ if (!res.has_value()) [[unlikely]] {
+ _is_fetching = false;
+ return;
+ }
+ int64_t start = res.value();
{
- std::lock_guard<std::mutex> lock(_backend_buffer_latch);
- _backend_buffer = {result.start, result.length};
+ std::lock_guard<std::mutex> lock {_latch};
+ _buffers.emplace_back(start, length);
+ _current_volume += length;
}
_is_fetching = false;
}));
diff --git a/be/src/vec/sink/autoinc_buffer.h b/be/src/vec/sink/autoinc_buffer.h
index 3ec009b0960..032ac18981f 100644
--- a/be/src/vec/sink/autoinc_buffer.h
+++ b/be/src/vec/sink/autoinc_buffer.h
@@ -61,17 +61,35 @@ public:
// all public functions are thread safe
AutoIncIDBuffer(int64_t _db_id, int64_t _table_id, int64_t column_id);
void set_batch_size_at_least(size_t batch_size);
- Status sync_request_ids(size_t length, std::vector<std::pair<int64_t,
size_t>>* result);
+ Status sync_request_ids(size_t request_length,
std::vector<std::pair<int64_t, size_t>>* result);
+
+ struct AutoIncRange {
+ int64_t start;
+ size_t length;
+
+ bool empty() const { return length == 0; }
+
+ void consume(size_t l) {
+ start += l;
+ length -= l;
+ }
+ };
private:
- Status _prefetch_ids(size_t length);
[[nodiscard]] size_t _prefetch_size() const {
return _batch_size * config::auto_inc_prefetch_size_ratio;
}
+
[[nodiscard]] size_t _low_water_level_mark() const {
return _batch_size * config::auto_inc_low_water_level_mark_size_ratio;
};
- void _wait_for_prefetching();
+
+ void _get_autoinc_ranges_from_buffers(size_t& request_length,
+ std::vector<std::pair<int64_t,
size_t>>* result);
+
+ Status _launch_async_fetch_task(size_t length);
+
+ Result<int64_t> _fetch_ids_from_fe(size_t length);
std::atomic<size_t> _batch_size {MIN_BATCH_SIZE};
@@ -81,12 +99,14 @@ private:
std::unique_ptr<ThreadPoolToken> _rpc_token;
Status _rpc_status {Status::OK()};
+
std::atomic<bool> _is_fetching {false};
- std::pair<int64_t, size_t> _front_buffer {0, 0};
- std::pair<int64_t, size_t> _backend_buffer {0, 0};
- std::mutex _backend_buffer_latch; // for _backend_buffer
std::mutex _mutex;
+
+ mutable std::mutex _latch;
+ std::list<AutoIncRange> _buffers;
+ size_t _current_volume {0};
};
class GlobalAutoIncBuffers {
@@ -115,8 +135,7 @@ public:
auto key = std::make_tuple(db_id, table_id, column_id);
auto it = _buffers.find(key);
if (it == _buffers.end()) {
- _buffers.emplace(std::make_pair(
- key, AutoIncIDBuffer::create_shared(db_id, table_id,
column_id)));
+ _buffers.emplace(key, AutoIncIDBuffer::create_shared(db_id,
table_id, column_id));
}
return _buffers[{db_id, table_id, column_id}];
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
index be110360850..e4c8cf5de01 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
@@ -40,7 +40,7 @@ public class AutoIncrementGenerator implements Writable,
GsonPostProcessable {
public static final long NEXT_ID_INIT_VALUE = 1;
// _MIN_BATCH_SIZE = 4064 in load task
- private static final long BATCH_ID_INTERVAL = 50000;
+ private static final long BATCH_ID_INTERVAL = 500000;
@SerializedName(value = "dbId")
private Long dbId;
@@ -48,7 +48,6 @@ public class AutoIncrementGenerator implements Writable,
GsonPostProcessable {
private Long tableId;
@SerializedName(value = "columnId")
private Long columnId;
- @SerializedName(value = "nextId")
private long nextId;
@SerializedName(value = "batchEndId")
private long batchEndId;
@@ -86,10 +85,10 @@ public class AutoIncrementGenerator implements Writable,
GsonPostProcessable {
long endId = startId + length;
nextId = startId + length;
if (endId > batchEndId) {
- batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
Preconditions.checkState(editLog != null);
AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId,
tableId, columnId, batchEndId);
editLog.logUpdateAutoIncrementId(info);
+ batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
}
LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length);
return Pair.of(startId, length);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index d99a4a316ac..7bee62880e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2769,6 +2769,16 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
+
+ if (!Env.getCurrentEnv().isMaster()) {
+ status.setStatusCode(TStatusCode.NOT_MASTER);
+ status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
+ result.setMasterAddress(getMasterAddress());
+ LOG.error("failed to getAutoIncrementRange:{}, request:{},
backend:{}",
+ NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
+ return result;
+ }
+
try {
Env env = Env.getCurrentEnv();
Database db =
env.getInternalCatalog().getDbOrMetaException(request.getDbId());
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 1ab23526962..d0942337f17 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -1253,6 +1253,7 @@ struct TAutoIncrementRangeResult {
1: optional Status.TStatus status
2: optional i64 start
3: optional i64 length
+ 4: optional Types.TNetworkAddress master_address
}
struct TCreatePartitionRequest {
diff --git
a/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out
b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out
new file mode 100644
index 00000000000..03819c9a717
--- /dev/null
+++
b/regression-test/data/data_model_p0/unique/test_unique_auto_inc_concurrent.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1500000 1500000
+
+-- !sql --
+3000000 3000000
+
+-- !sql --
+4500000 4500000
+
diff --git
a/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy
b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy
new file mode 100644
index 00000000000..bf6d584b2af
--- /dev/null
+++
b/regression-test/suites/data_model_p0/unique/test_unique_auto_inc_concurrent.groovy
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_unique_table_auto_inc_concurrent") {
+
+ def table1 = "test_unique_table_auto_inc_concurrent"
+ sql "drop table if exists ${table1}"
+ sql """
+ CREATE TABLE IF NOT EXISTS `${table1}` (
+ `id` BIGINT NOT NULL AUTO_INCREMENT,
+ `value` int(11) NOT NULL
+ ) ENGINE=OLAP
+ UNIQUE KEY(`id`)
+ COMMENT "OLAP"
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "in_memory" = "false",
+ "storage_format" = "V2",
+ "enable_unique_key_merge_on_write" = "true"
+ )
+ """
+
+ def run_test = {thread_num, rows, iters ->
+ def threads = []
+ (1..thread_num).each { id1 ->
+ threads.add(Thread.start {
+ (1..iters).each { id2 ->
+ sql """insert into ${table1}(value) select number from
numbers("number" = "${rows}");"""
+ }
+ })
+ }
+
+ threads.each { thread -> thread.join() }
+
+ qt_sql "select count(id), count(distinct id) from ${table1};"
+ }
+
+ run_test(15, 10000, 10)
+ run_test(15, 100000, 1)
+ run_test(5, 30000, 10)
+
+ sql "drop table if exists ${table1};"
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]