This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 12203249e38 [fix](autoinc) Fix AutoIncrementGenerator and add more
logs about auto-increment column (#37306)
12203249e38 is described below
commit 12203249e38d5d467be573be8b3b05acd47764c3
Author: bobhan1 <[email protected]>
AuthorDate: Sat Jul 6 16:53:17 2024 +0800
[fix](autoinc) Fix AutoIncrementGenerator and add more logs about
auto-increment column (#37306)
## Proposed changes
1. when run out batchEndId in memory, we (1) write edit log to update
batchEndId, (2) update batchEndId in memory, (3) update nextId in memory
2. add more logs about auto-increment column.
3. remove useless `lowerBound`
branch-2.1-pick: https://github.com/apache/doris/pull/37366
---
be/src/vec/sink/autoinc_buffer.cpp | 25 ++++++++++++--------
.../doris/catalog/AutoIncrementGenerator.java | 27 +++++++++++++++-------
.../apache/doris/service/FrontendServiceImpl.java | 17 +++++---------
3 files changed, 40 insertions(+), 29 deletions(-)
diff --git a/be/src/vec/sink/autoinc_buffer.cpp
b/be/src/vec/sink/autoinc_buffer.cpp
index f83dbcb55b8..4bc87dff489 100644
--- a/be/src/vec/sink/autoinc_buffer.cpp
+++ b/be/src/vec/sink/autoinc_buffer.cpp
@@ -64,37 +64,42 @@ Result<int64_t> AutoIncIDBuffer::_fetch_ids_from_fe(size_t
length) {
client->getAutoIncrementRange(result, request);
});
}
- LOG(INFO) << "[auto-inc-range][start=" << result.start << ",length="
<< result.length
- << "][elapsed=" << get_auto_inc_range_rpc_ns / 1000000 << "
ms]";
if (_rpc_status.is<ErrorCode::NOT_MASTER>()) {
LOG_WARNING(
- "Failed to fetch auto-incremnt range, request to
non-master FE, discard all "
- "auto_increment ranges in _buffers. retry_time={}",
- retry_times);
+ "Failed to fetch auto-incremnt range, requested to
non-master FE@{}:{}, change "
+ "to request to FE@{}:{}. retry_time={}, db_id={},
table_id={}, column_id={}",
+ master_addr.hostname, master_addr.port,
result.master_address.hostname,
+ result.master_address.port, retry_times, _db_id,
_table_id, _column_id);
master_addr = result.master_address;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (!_rpc_status.ok()) {
- LOG(WARNING)
- << "Failed to fetch auto-incremnt range, encounter rpc
failure. retry_time="
- << retry_times << ", errmsg=" << _rpc_status.to_string();
+ LOG_WARNING(
+ "Failed to fetch auto-incremnt range, encounter rpc
failure. "
+ "errmsg={}, retry_time={}, db_id={}, table_id={},
column_id={}",
+ _rpc_status.to_string(), retry_times, _db_id, _table_id,
_column_id);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
if (result.length != length) [[unlikely]] {
auto msg = fmt::format(
"Failed to fetch auto-incremnt range, request length={},
but get "
- "result.length={}, retry_time={}",
- length, result.length, retry_times);
+ "result.length={}, retry_time={}, db_id={}, table_id={},
column_id={}",
+ length, result.length, retry_times, _db_id, _table_id,
_column_id);
LOG(WARNING) << msg;
_rpc_status = Status::RpcError<true>(msg);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
+ LOG_INFO(
+ "get auto-incremnt range from FE@{}:{}, start={}, length={},
elapsed={}ms, "
+ "retry_time={}, db_id={}, table_id={}, column_id={}",
+ master_addr.hostname, master_addr.port, result.start,
result.length,
+ get_auto_inc_range_rpc_ns / 1000000, retry_times, _db_id,
_table_id, _column_id);
return result.start;
}
CHECK(!_rpc_status.ok());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
index e4c8cf5de01..de6ad325106 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/AutoIncrementGenerator.java
@@ -38,7 +38,6 @@ import java.io.IOException;
public class AutoIncrementGenerator implements Writable, GsonPostProcessable {
private static final Logger LOG =
LogManager.getLogger(AutoIncrementGenerator.class);
- public static final long NEXT_ID_INIT_VALUE = 1;
// _MIN_BATCH_SIZE = 4064 in load task
private static final long BATCH_ID_INTERVAL = 500000;
@@ -62,6 +61,7 @@ public class AutoIncrementGenerator implements Writable,
GsonPostProcessable {
this.tableId = tableId;
this.columnId = columnId;
this.nextId = nextId;
+ this.batchEndId = -1;
}
public void setEditLog(EditLog editLog) {
@@ -70,6 +70,8 @@ public class AutoIncrementGenerator implements Writable,
GsonPostProcessable {
public synchronized void applyChange(long columnId, long batchNextId) {
if (this.columnId == columnId && batchEndId < batchNextId) {
+ LOG.info("[auto-inc] AutoIncrementGenerator applyChange, db_id={},
table_id={}, column_id={}, "
+ + "batchNextId={}", dbId, tableId, columnId, batchNextId);
nextId = batchNextId;
batchEndId = batchNextId;
}
@@ -77,20 +79,25 @@ public class AutoIncrementGenerator implements Writable,
GsonPostProcessable {
public synchronized Pair<Long, Long> getAutoIncrementRange(long columnId,
long length, long lowerBound) throws UserException {
- LOG.info("[getAutoIncrementRange request][col:{}][length:{}], [{}]",
columnId, length, this.columnId);
+ LOG.info("[auto-inc] getAutoIncrementRange request, db_id={},
table_id={}, column_id={}, length={}", dbId,
+ tableId, columnId, length);
if (this.columnId != columnId) {
throw new UserException("column dosen't exist, columnId=" +
columnId);
}
- long startId = Math.max(nextId, lowerBound);
+ long startId = nextId;
long endId = startId + length;
- nextId = startId + length;
if (endId > batchEndId) {
Preconditions.checkState(editLog != null);
- AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId,
tableId, columnId, batchEndId);
+ long newBatchEndId = (endId / BATCH_ID_INTERVAL + 1) *
BATCH_ID_INTERVAL;
+ AutoIncrementIdUpdateLog info = new AutoIncrementIdUpdateLog(dbId,
tableId, columnId, newBatchEndId);
editLog.logUpdateAutoIncrementId(info);
- batchEndId = (endId / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
+ batchEndId = newBatchEndId;
+ LOG.info("[auto-inc] update batchEndId to {}, db_id={},
table_id={}, column_id={}",
+ newBatchEndId, dbId, tableId, columnId);
}
- LOG.info("[getAutoIncrementRange result][{}, {}]", startId, length);
+ nextId = endId;
+ LOG.info("[auto-inc] getAutoIncrementRange result, db_id={},
table_id={}, column_id={}, start={}, length:{}",
+ dbId, tableId, columnId, startId, length);
return Pair.of(startId, length);
}
@@ -100,12 +107,16 @@ public class AutoIncrementGenerator implements Writable,
GsonPostProcessable {
}
public static AutoIncrementGenerator read(DataInput in) throws IOException
{
- return GsonUtils.GSON.fromJson(Text.readString(in),
AutoIncrementGenerator.class);
+ AutoIncrementGenerator res =
GsonUtils.GSON.fromJson(Text.readString(in), AutoIncrementGenerator.class);
+ LOG.info("[auto-inc] read AutoIncrementGenerator db_id={},
table_id={}, column_id={}, nextId={}, "
+ + "batchEndId={}", res.dbId, res.tableId, res.columnId,
res.nextId, res.batchEndId);
+ return res;
}
@Override
public void gsonPostProcess() throws IOException {
nextId = batchEndId;
+ LOG.info("[auto-inc] AutoIncrementGenerator set nextId to
batchEndId={}", batchEndId);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b7e11be47be..132378f6511 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -2619,9 +2619,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
@Override
public TAutoIncrementRangeResult
getAutoIncrementRange(TAutoIncrementRangeRequest request) {
String clientAddr = getClientAddrAsString();
- if (LOG.isDebugEnabled()) {
- LOG.debug("receive get auto-increement range request: {}, backend:
{}", request, clientAddr);
- }
+ LOG.info("[auto-inc] receive getAutoIncrementRange request: {},
backend: {}", request, clientAddr);
TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
TStatus status = new TStatus(TStatusCode.OK);
@@ -2631,7 +2629,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
- LOG.error("failed to getAutoIncrementRange:{}, request:{},
backend:{}",
+ LOG.error("[auto-inc] failed to getAutoIncrementRange:{},
request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
@@ -2644,19 +2642,16 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
autoIncrementGenerator = olapTable.getAutoIncrementGenerator();
long columnId = request.getColumnId();
long length = request.getLength();
- long lowerBound = -1;
- if (request.isSetLowerBound()) {
- lowerBound = request.getLowerBound();
- }
- Pair<Long, Long> range =
autoIncrementGenerator.getAutoIncrementRange(columnId, length, lowerBound);
+ Pair<Long, Long> range =
autoIncrementGenerator.getAutoIncrementRange(columnId, length, -1);
result.setStart(range.first);
result.setLength(range.second);
} catch (UserException e) {
- LOG.warn("failed to get auto-increment range of column {}: {}",
request.getColumnId(), e.getMessage());
+ LOG.warn("[auto-inc] failed to get auto-increment range of
db_id={}, table_id={}, column_id={}, errmsg={}",
+ request.getDbId(), request.getTableId(),
request.getColumnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
- LOG.warn("catch unknown result.", e);
+ LOG.warn("[auto-inc] catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " +
Strings.nullToEmpty(e.getMessage()));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]