This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new adfbe69415e branch-3.0: [Fix](auto-increment) Fix duplicate
auto-increment column value problem #43774 (#43983)
adfbe69415e is described below
commit adfbe69415ee8ae5bc6c53fcf82aee86479bd081
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 15 10:39:14 2024 +0800
branch-3.0: [Fix](auto-increment) Fix duplicate auto-increment column value
problem #43774 (#43983)
Cherry-picked from #43774
Co-authored-by: bobhan1 <[email protected]>
---
be/src/vec/sink/autoinc_buffer.cpp | 33 +++++++++---
.../test_auto_inc_fetch_fail.out | 10 ++++
.../test_auto_inc_fetch_fail.groovy | 63 ++++++++++++++++++++++
3 files changed, 99 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/sink/autoinc_buffer.cpp
b/be/src/vec/sink/autoinc_buffer.cpp
index 80ce9d494d5..8754c01f806 100644
--- a/be/src/vec/sink/autoinc_buffer.cpp
+++ b/be/src/vec/sink/autoinc_buffer.cpp
@@ -26,6 +26,7 @@
#include "common/status.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
+#include "util/debug_points.h"
#include "util/runtime_profile.h"
#include "util/thrift_rpc_helper.h"
@@ -44,10 +45,18 @@ void AutoIncIDBuffer::set_batch_size_at_least(size_t
batch_size) {
}
Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t length) {
+ LOG_INFO(
+ "[AutoIncIDBuffer::_fetch_ids_from_fe] begin to fetch
auto-increment values from fe, "
+ "db_id={}, table_id={}, column_id={}, length={}",
+ _db_id, _table_id, _column_id, length);
constexpr uint32_t FETCH_AUTOINC_MAX_RETRY_TIMES = 3;
_rpc_status = Status::OK();
TNetworkAddress master_addr =
ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
for (uint32_t retry_times = 0; retry_times <
FETCH_AUTOINC_MAX_RETRY_TIMES; retry_times++) {
+ DBUG_EXECUTE_IF("AutoIncIDBuffer::_fetch_ids_from_fe.failed", {
+ _rpc_status = Status::InternalError<false>("injected error");
+ break;
+ });
TAutoIncrementRangeRequest request;
TAutoIncrementRangeResult result;
request.__set_db_id(_db_id);
@@ -67,8 +76,9 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t
length) {
if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
- "Failed to fetch auto-incremnt range, requested to
non-master FE@{}:{}, change "
- "to request to FE@{}:{}. retry_time={}, db_id={},
table_id={}, column_id={}",
+ "Failed to fetch auto-increment range, requested to
non-master FE@{}:{}, "
+ "change to request to FE@{}:{}. retry_time={}, db_id={},
table_id={}, "
+ "column_id={}",
master_addr.hostname, master_addr.port,
result.master_address.hostname,
result.master_address.port, retry_times, _db_id,
_table_id, _column_id);
master_addr = result.master_address;
@@ -78,7 +88,7 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t
length) {
if (!_rpc_status.ok()) {
LOG_WARNING(
- "Failed to fetch auto-incremnt range, encounter rpc
failure. "
+ "Failed to fetch auto-increment range, encounter rpc
failure. "
"errmsg={}, retry_time={}, db_id={}, table_id={},
column_id={}",
_rpc_status.to_string(), retry_times, _db_id, _table_id,
_column_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
@@ -86,7 +96,7 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t
length) {
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
- "Failed to fetch auto-incremnt range, request length={},
but get "
+ "Failed to fetch auto-increment range, request length={},
but get "
"result.length={}, retry_time={}, db_id={}, table_id={},
column_id={}",
length, result.length, retry_times, _db_id, _table_id,
_column_id);
LOG(WARNING) << msg;
@@ -96,14 +106,14 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t
length) {
}
LOG_INFO(
- "get auto-incremnt range from FE@{}:{}, start={}, length={},
elapsed={}ms, "
+ "get auto-increment range from FE@{}:{}, start={}, length={},
elapsed={}ms, "
"retry_time={}, db_id={}, table_id={}, column_id={}",
master_addr.hostname, master_addr.port, result.start,
result.length,
get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id,
_table_id, _column_id);
return result.start;
}
CHECK(!_rpc_status.ok());
- return _rpc_status;
+ return ResultError(_rpc_status);
}
void AutoIncIDBuffer::_get_autoinc_ranges_from_buffers(
@@ -153,10 +163,19 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t
length) {
RETURN_IF_ERROR(_rpc_token->submit_func([=, this]() {
auto&& res = _fetch_ids_from_fe(length);
if (!res.has_value()) [[unlikely]] {
+ auto&& err = res.error();
+ LOG_WARNING(
+ "[AutoIncIDBuffer::_launch_async_fetch_task] failed to
fetch auto-increment "
+ "values from fe, db_id={}, table_id={}, column_id={},
status={}",
+ _db_id, _table_id, _column_id, err);
_is_fetching = false;
return;
}
int64_t start = res.value();
+ LOG_INFO(
+ "[AutoIncIDBuffer::_launch_async_fetch_task] successfully
fetch auto-increment "
+ "values from fe, db_id={}, table_id={}, column_id={},
start={}, length={}",
+ _db_id, _table_id, _column_id, start, length);
{
std::lock_guard<std::mutex> lock {_latch};
_buffers.emplace_back(start, length);
@@ -167,4 +186,4 @@ Status AutoIncIDBuffer::_launch_async_fetch_task(size_t
length) {
return Status::OK();
}
-} // namespace doris::vectorized
+} // namespace doris::vectorized
\ No newline at end of file
diff --git
a/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out
b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out
new file mode 100644
index 00000000000..453e378f9c4
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/test_auto_inc_fetch_fail.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+0
+
+-- !sql --
+4
+
+-- !sql --
+0
+
diff --git
a/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy
b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy
new file mode 100644
index 00000000000..e9bb6ae9a3c
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_auto_inc_fetch_fail.groovy
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+
+suite("test_auto_inc_fetch_fail", "nonConcurrent") {
+
+ try {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ def table1 = "test_auto_inc_fetch_fail"
+ sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k` int,
+ `c1` int,
+ `c2` int,
+ `c3` int,
+ `id` BIGINT NOT NULL AUTO_INCREMENT(10000),
+ ) UNIQUE KEY(k)
+ DISTRIBUTED BY HASH(k) BUCKETS 1
+ PROPERTIES ("replication_num" = "1"); """
+
+
GetDebugPoint().enableDebugPointForAllBEs("AutoIncIDBuffer::_fetch_ids_from_fe.failed")
+
+ try {
+ sql """insert into ${table1}(k,c1,c2,c3)
values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """
+ } catch (Exception e) {
+ logger.info("error : ${e}")
+ }
+ qt_sql "select count(*) from ${table1};"
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ Thread.sleep(1000)
+
+ sql """insert into ${table1}(k,c1,c2,c3)
values(1,1,1,1),(2,2,2,2),(3,3,3,3),(4,4,4,4); """
+ qt_sql "select count(*) from ${table1};"
+ qt_sql "select count(*) from ${table1} where id < 10000;"
+
+ } catch(Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]