This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 37a814b36d2 branch-3.1: pick fe changes of [improve](cloud-mow)Add
some metrics aboout delete bitmap update lock #47988 (#52278)
37a814b36d2 is described below
commit 37a814b36d255c818aa0a00e19e7f6bf3b124b0e
Author: meiyi <[email protected]>
AuthorDate: Fri Jun 27 10:31:47 2025 +0800
branch-3.1: pick fe changes of [improve](cloud-mow)Add some metrics aboout
delete bitmap update lock #47988 (#52278)
cherry pick from #47988
---
.../transaction/CloudGlobalTransactionMgr.java | 514 ++++++++++++---------
1 file changed, 301 insertions(+), 213 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index d3762721e05..68148214f75 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -626,7 +626,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
final CommitTxnRequest commitTxnRequest = builder.build();
+ executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC,
txnCommitAttachment);
+ }
+ private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest,
long transactionId, boolean is2PC,
+ TxnCommitAttachment txnCommitAttachment) throws UserException {
if
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.executeCommitTxnRequest.block"))
{
LOG.info("debug point: block at
CloudGlobalTransactionMgr.executeCommitTxnRequest.block");
while
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.executeCommitTxnRequest.block"))
{
@@ -643,8 +647,10 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
TransactionState txnState = null;
TxnStateChangeCallback cb = null;
long callbackId = 0L;
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
try {
- txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId,
tableList);
+ txnState = commitTxn(commitTxnRequest, transactionId, is2PC);
txnOperated = true;
if
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
{
throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
@@ -668,11 +674,18 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
cb.afterVisible(txnState, txnOperated);
}
}
+ long costTime = stopWatch.getTime();
+ if (MetricRepo.isInit) {
+ MetricRepo.HISTO_COMMIT_TO_MS_LATENCY.update(costTime);
+ }
+ if (commitCostTimeStatisticMap.containsKey(transactionId)) {
+
commitCostTimeStatisticMap.get(transactionId).setCommitToMsCostTimeMs(costTime);
+ }
}
}
- private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC, long dbId,
- List<Table> tableList) throws UserException {
+ private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC)
+ throws UserException {
CommitTxnResponse commitTxnResponse = null;
TransactionState txnState = null;
int retryTime = 0;
@@ -898,132 +911,151 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
- getPartitionInfo(mowTableList, tabletCommitInfos, lockContext);
int totalRetryTime = 0;
String retryMsg = "";
- for (Map.Entry<Long, Set<Long>> entry :
lockContext.getTableToPartitions().entrySet()) {
- GetDeleteBitmapUpdateLockRequest.Builder builder =
GetDeleteBitmapUpdateLockRequest.newBuilder();
-
builder.setTableId(entry.getKey()).setLockId(transactionId).setInitiator(-1)
-
.setExpiration(Config.delete_bitmap_lock_expiration_seconds).setRequireCompactionStats(true);
- List<Long> tabletList =
lockContext.getTableToTabletList().get(entry.getKey());
- for (Long tabletId : tabletList) {
- TabletMeta tabletMeta =
lockContext.getTabletToTabletMeta().get(tabletId);
- TabletIndexPB.Builder tabletIndexBuilder =
TabletIndexPB.newBuilder();
- tabletIndexBuilder.setDbId(tabletMeta.getDbId());
- tabletIndexBuilder.setTableId(tabletMeta.getTableId());
- tabletIndexBuilder.setIndexId(tabletMeta.getIndexId());
- tabletIndexBuilder.setPartitionId(tabletMeta.getPartitionId());
- tabletIndexBuilder.setTabletId(tabletId);
- builder.addTabletIndexes(tabletIndexBuilder);
- }
- final GetDeleteBitmapUpdateLockRequest request = builder.build();
- GetDeleteBitmapUpdateLockResponse response = null;
-
- int retryTime = 0;
- while (retryTime++ < Config.metaServiceRpcRetryTimes()) {
- try {
- response =
MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request);
- if (LOG.isDebugEnabled()) {
- LOG.debug("get delete bitmap lock, transactionId={},
Request: {}, Response: {}", transactionId,
- request, response);
- }
- if
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.conflict"))
{
- DebugPoint debugPoint = DebugPointUtil.getDebugPoint(
-
"CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.conflict");
- double percent = debugPoint.param("percent", 0.4);
- long timestamp = System.currentTimeMillis();
- Random random = new Random(timestamp);
- if (Math.abs(random.nextInt()) % 100 < 100 * percent) {
- LOG.info("set kv txn conflict for test");
- GetDeleteBitmapUpdateLockResponse.Builder
getLockResponseBuilder
- =
GetDeleteBitmapUpdateLockResponse.newBuilder();
-
getLockResponseBuilder.setStatus(MetaServiceResponseStatus.newBuilder()
-
.setCode(MetaServiceCode.KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES)
- .setMsg("kv txn conflict"));
- response = getLockResponseBuilder.build();
+ boolean res = false;
+ try {
+ getPartitionInfo(mowTableList, tabletCommitInfos, lockContext);
+ for (Map.Entry<Long, Set<Long>> entry :
lockContext.getTableToPartitions().entrySet()) {
+ GetDeleteBitmapUpdateLockRequest.Builder builder =
GetDeleteBitmapUpdateLockRequest.newBuilder();
+
builder.setTableId(entry.getKey()).setLockId(transactionId).setInitiator(-1)
+
.setExpiration(Config.delete_bitmap_lock_expiration_seconds).setRequireCompactionStats(true);
+ List<Long> tabletList =
lockContext.getTableToTabletList().get(entry.getKey());
+ for (Long tabletId : tabletList) {
+ TabletMeta tabletMeta =
lockContext.getTabletToTabletMeta().get(tabletId);
+ TabletIndexPB.Builder tabletIndexBuilder =
TabletIndexPB.newBuilder();
+ tabletIndexBuilder.setDbId(tabletMeta.getDbId());
+ tabletIndexBuilder.setTableId(tabletMeta.getTableId());
+ tabletIndexBuilder.setIndexId(tabletMeta.getIndexId());
+
tabletIndexBuilder.setPartitionId(tabletMeta.getPartitionId());
+ tabletIndexBuilder.setTabletId(tabletId);
+ builder.addTabletIndexes(tabletIndexBuilder);
+ }
+ final GetDeleteBitmapUpdateLockRequest request =
builder.build();
+ GetDeleteBitmapUpdateLockResponse response = null;
+
+ int retryTime = 0;
+ while (retryTime++ < Config.metaServiceRpcRetryTimes()) {
+ try {
+ response =
MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("get delete bitmap lock,
transactionId={}, Request: {}, Response: {}",
+ transactionId,
+ request, response);
+ }
+ if
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.conflict"))
{
+ DebugPoint debugPoint =
DebugPointUtil.getDebugPoint(
+
"CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.conflict");
+ double percent = debugPoint.param("percent", 0.4);
+ long timestamp = System.currentTimeMillis();
+ Random random = new Random(timestamp);
+ if (Math.abs(random.nextInt()) % 100 < 100 *
percent) {
+ LOG.info("set kv txn conflict for test");
+ GetDeleteBitmapUpdateLockResponse.Builder
getLockResponseBuilder
+ =
GetDeleteBitmapUpdateLockResponse.newBuilder();
+
getLockResponseBuilder.setStatus(MetaServiceResponseStatus.newBuilder()
+
.setCode(MetaServiceCode.KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES)
+ .setMsg("kv txn conflict"));
+ response = getLockResponseBuilder.build();
+ }
}
+ if (response.getStatus().getCode() !=
MetaServiceCode.LOCK_CONFLICT
+ && response.getStatus().getCode() !=
MetaServiceCode.KV_TXN_CONFLICT) {
+ break;
+ }
+ } catch (Exception e) {
+ LOG.warn("ignore get delete bitmap lock exception,
transactionId={}, retryTime={}, tableIds={}",
+ transactionId,
+ retryTime,
mowTableList.stream().map(Table::getId).collect(Collectors.toList()), e);
}
- if (response.getStatus().getCode() !=
MetaServiceCode.LOCK_CONFLICT
- && response.getStatus().getCode() !=
MetaServiceCode.KV_TXN_CONFLICT) {
- break;
+ retryMsg = response.toString();
+ if (DebugPointUtil.isEnable("FE.mow.check.lock.release")
+ && response.getStatus().getCode() ==
MetaServiceCode.LOCK_CONFLICT) {
+ throw new UserException(InternalErrorCode.INTERNAL_ERR,
+ "check delete bitmap lock release
fail,response is " + response
+ + ", tableList=(" +
StringUtils.join(mowTableList, ",") + ")");
+ }
+ // sleep random millis [20, 300] ms, avoid txn conflict
+ int randomMillis = 20 + (int) (Math.random() * (300 - 20));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("randomMillis:{}", randomMillis);
+ }
+ try {
+ Thread.sleep(randomMillis);
+ } catch (InterruptedException e) {
+ LOG.info("InterruptedException: ", e);
}
- } catch (Exception e) {
- LOG.warn("ignore get delete bitmap lock exception,
transactionId={}, retryTime={}, tableIds={}",
- transactionId,
- retryTime,
mowTableList.stream().map(Table::getId).collect(Collectors.toList()), e);
- }
- retryMsg = response.toString();
- if (DebugPointUtil.isEnable("FE.mow.check.lock.release")
- && response.getStatus().getCode() ==
MetaServiceCode.LOCK_CONFLICT) {
- throw new UserException(InternalErrorCode.INTERNAL_ERR,
- "check delete bitmap lock release fail,response is
" + response
- + ", tableList=(" +
StringUtils.join(mowTableList, ",") + ")");
}
- // sleep random millis [20, 300] ms, avoid txn conflict
- int randomMillis = 20 + (int) (Math.random() * (300 - 20));
- if (LOG.isDebugEnabled()) {
- LOG.debug("randomMillis:{}", randomMillis);
+ Preconditions.checkNotNull(response);
+ Preconditions.checkNotNull(response.getStatus());
+ if
(DebugPointUtil.isEnable("FE.mow.get_delete_bitmap_lock.fail")) {
+ throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+ "test get_delete_bitmap_lock fail");
}
- try {
- Thread.sleep(randomMillis);
- } catch (InterruptedException e) {
- LOG.info("InterruptedException: ", e);
+ if (response.getStatus().getCode() != MetaServiceCode.OK) {
+ LOG.warn("get delete bitmap lock failed, transactionId={},
for {} times, response:{}",
+ transactionId,
+ retryTime, response);
+ if (response.getStatus().getCode() ==
MetaServiceCode.LOCK_CONFLICT
+ || response.getStatus().getCode() ==
MetaServiceCode.KV_TXN_CONFLICT
+ || response.getStatus().getCode()
+ ==
MetaServiceCode.KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
+ // DELETE_BITMAP_LOCK_ERR will be retried on be
+ throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+ "Failed to get delete bitmap lock due to
conflict");
+ }
+ throw new UserException(
+ "Failed to get delete bitmap lock, msg: " +
response.getStatus().getMsg() + ", code: "
+ + response.getStatus().getCode());
}
- }
- Preconditions.checkNotNull(response);
- Preconditions.checkNotNull(response.getStatus());
- if (DebugPointUtil.isEnable("FE.mow.get_delete_bitmap_lock.fail"))
{
- throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
- "test get_delete_bitmap_lock fail");
- }
- if (response.getStatus().getCode() != MetaServiceCode.OK) {
- LOG.warn("get delete bitmap lock failed, transactionId={}, for
{} times, response:{}", transactionId,
- retryTime, response);
- if (response.getStatus().getCode() ==
MetaServiceCode.LOCK_CONFLICT
- || response.getStatus().getCode() ==
MetaServiceCode.KV_TXN_CONFLICT
- || response.getStatus().getCode() ==
MetaServiceCode.KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES) {
- // DELETE_BITMAP_LOCK_ERR will be retried on be
- throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
- "Failed to get delete bitmap lock due to
conflict");
+
+ // record tablet's latest compaction stats from meta service
and send them to BEs
+ // to let BEs eliminate unnecessary sync_rowsets() calls if
possible
+ List<Long> respBaseCompactionCnts =
response.getBaseCompactionCntsList();
+ List<Long> respCumulativeCompactionCnts =
response.getCumulativeCompactionCntsList();
+ List<Long> respCumulativePoints =
response.getCumulativePointsList();
+ List<Long> respTabletStates = response.getTabletStatesList();
+ int size1 = respBaseCompactionCnts.size();
+ int size2 = respCumulativeCompactionCnts.size();
+ int size3 = respCumulativePoints.size();
+ int size4 = respTabletStates.size();
+ if (size1 != tabletList.size() || size2 != tabletList.size()
|| size3 != tabletList.size()
+ || (size4 > 0 && size4 != tabletList.size())) {
+ throw new UserException("The size of returned compaction
cnts can't match the size of tabletList, "
+ + "tabletList.size()=" + tabletList.size() + ",
respBaseCompactionCnts.size()=" + size1
+ + ", respCumulativeCompactionCnts.size()=" + size2
+ ", respCumulativePoints.size()="
+ + size3
+ + ", respTabletStates.size()=" + size4);
}
- throw new UserException(
- "Failed to get delete bitmap lock, msg: " +
response.getStatus().getMsg() + ", code: "
- + response.getStatus().getCode());
- }
-
- // record tablet's latest compaction stats from meta service and
send them to BEs
- // to let BEs eliminate unnecessary sync_rowsets() calls if
possible
- List<Long> respBaseCompactionCnts =
response.getBaseCompactionCntsList();
- List<Long> respCumulativeCompactionCnts =
response.getCumulativeCompactionCntsList();
- List<Long> respCumulativePoints =
response.getCumulativePointsList();
- List<Long> respTabletStates = response.getTabletStatesList();
- int size1 = respBaseCompactionCnts.size();
- int size2 = respCumulativeCompactionCnts.size();
- int size3 = respCumulativePoints.size();
- int size4 = respTabletStates.size();
- if (size1 != tabletList.size() || size2 != tabletList.size() ||
size3 != tabletList.size()
- || (size4 > 0 && size4 != tabletList.size())) {
- throw new UserException("The size of returned compaction cnts
can't match the size of tabletList, "
- + "tabletList.size()=" + tabletList.size() + ",
respBaseCompactionCnts.size()=" + size1
- + ", respCumulativeCompactionCnts.size()=" + size2 +
", respCumulativePoints.size()=" + size3
- + ", respTabletStates.size()=" + size4);
- }
- for (int i = 0; i < tabletList.size(); i++) {
- long tabletId = tabletList.get(i);
- lockContext.getBaseCompactionCnts().put(tabletId,
respBaseCompactionCnts.get(i));
- lockContext.getCumulativeCompactionCnts().put(tabletId,
respCumulativeCompactionCnts.get(i));
- lockContext.getCumulativePoints().put(tabletId,
respCumulativePoints.get(i));
- if (size4 > 0) {
- lockContext.getTabletStates().put(tabletId,
respTabletStates.get(i));
+ for (int i = 0; i < tabletList.size(); i++) {
+ long tabletId = tabletList.get(i);
+ lockContext.getBaseCompactionCnts().put(tabletId,
respBaseCompactionCnts.get(i));
+ lockContext.getCumulativeCompactionCnts().put(tabletId,
respCumulativeCompactionCnts.get(i));
+ lockContext.getCumulativePoints().put(tabletId,
respCumulativePoints.get(i));
+ if (size4 > 0) {
+ lockContext.getTabletStates().put(tabletId,
respTabletStates.get(i));
+ }
}
+ totalRetryTime += retryTime;
+ }
+ res = true;
+ } finally {
+ stopWatch.stop();
+ long costTime = stopWatch.getTime();
+ if (MetricRepo.isInit) {
+
MetricRepo.HISTO_GET_DELETE_BITMAP_UPDATE_LOCK_LATENCY.update(costTime);
+ }
+ if (commitCostTimeStatisticMap.containsKey(transactionId)) {
+
commitCostTimeStatisticMap.get(transactionId).setWaitDeleteBitmapLockCostTimeMs(costTime);
+ }
+ String status = res ? "successfully" : "fail";
+ if (costTime > 1000) {
+ LOG.info("get delete bitmap lock {} . txnId: {}.
totalRetryTime: {}. "
+ + "tableSize: {}. cost: {} ms. tableIds: {}.
retryMsg: {}.", status,
+ transactionId, totalRetryTime,
lockContext.getTableToPartitions().size(), costTime,
+
mowTableList.stream().map(Table::getId).collect(Collectors.toList()), retryMsg);
}
- totalRetryTime += retryTime;
}
- stopWatch.stop();
- LOG.info("get delete bitmap lock successfully. txnId: {}.
totalRetryTime: {}. "
- + "tableSize: {}. cost: {} ms. tableIds: {}. retryMsg:
{}.", transactionId, totalRetryTime,
- lockContext.getTableToPartitions().size(), stopWatch.getTime(),
-
mowTableList.stream().map(Table::getId).collect(Collectors.toList()), retryMsg);
}
private void removeDeleteBitmapUpdateLock(List<OlapTable> tableList, long
transactionId) {
@@ -1069,94 +1101,109 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
- int totalTaskNum = backendToPartitionInfos.size();
- MarkedCountDownLatch<Long, Long> countDownLatch = new
MarkedCountDownLatch<Long, Long>(
- totalTaskNum);
- AgentBatchTask batchTask = new AgentBatchTask();
-
- long signature = getTxnLastSignature(dbId, transactionId);
- if (signature == -1) {
- // use txn_id as signature for every txn in first time
- signature = transactionId;
- } else {
- // If there exists other interleaved txns on the same table before,
- // we can not accept the response of previous calc delete bitmap
task which is created
- // by the current transaction before because the delete bitmap
written in those tasks
- // maybe be removed in MS.
- // So we change the signature to avoid to accept the response of
previous calc delete bitmap task
- boolean hasOtherInterleavedTxnBefore = mowTableIds.stream()
- .anyMatch(tableId -> {
- long lastTxnId = getTableLastTxnId(dbId, tableId);
- return lastTxnId != -1 && lastTxnId != transactionId;
- });
- if (hasOtherInterleavedTxnBefore) {
+ boolean res = false;
+ try {
+ int totalTaskNum = backendToPartitionInfos.size();
+ MarkedCountDownLatch<Long, Long> countDownLatch = new
MarkedCountDownLatch<Long, Long>(
+ totalTaskNum);
+ AgentBatchTask batchTask = new AgentBatchTask();
+
+ long signature = getTxnLastSignature(dbId, transactionId);
+ if (signature == -1) {
+ // use txn_id as signature for every txn in first time
+ signature = transactionId;
+ } else {
+ // If there exists other interleaved txns on the same table
before,
+ // we can not accept the response of previous calc delete
bitmap task which is created
+ // by the current transaction before because the delete bitmap
written in those tasks
+ // maybe be removed in MS.
+ // So we change the signature to avoid to accept the response
of previous calc delete bitmap task
+ boolean hasOtherInterleavedTxnBefore = mowTableIds.stream()
+ .anyMatch(tableId -> {
+ long lastTxnId = getTableLastTxnId(dbId, tableId);
+ return lastTxnId != -1 && lastTxnId !=
transactionId;
+ });
+ if (hasOtherInterleavedTxnBefore) {
+ signature = UUID.randomUUID().getLeastSignificantBits();
+ }
+ }
+ if (DebugPointUtil.isEnable("sendCalcDbmtask.change_signature")) {
signature = UUID.randomUUID().getLeastSignificantBits();
}
- }
- if (DebugPointUtil.isEnable("sendCalcDbmtask.change_signature")) {
- signature = UUID.randomUUID().getLeastSignificantBits();
- }
- setTxnLastSignature(dbId, transactionId, signature);
- for (long tableId : mowTableIds) {
- setTableLastTxnId(dbId, tableId, transactionId);
- }
-
- for (Map.Entry<Long, List<TCalcDeleteBitmapPartitionInfo>> entry :
backendToPartitionInfos.entrySet()) {
- CalcDeleteBitmapTask task = new
CalcDeleteBitmapTask(entry.getKey(),
- transactionId,
- dbId,
- entry.getValue(),
- signature,
- countDownLatch);
- countDownLatch.addMark(entry.getKey(), transactionId);
- // add to AgentTaskQueue for handling finish report.
- // not check return value, because the add will success
- AgentTaskQueue.addTask(task);
- batchTask.addTask(task);
- LOG.info("send calculate delete bitmap task to be {}, txn_id {},
signature {}, partitionInfos={}",
- entry.getKey(), transactionId, signature,
entry.getValue());
- }
- AgentTaskExecutor.submit(batchTask);
+ setTxnLastSignature(dbId, transactionId, signature);
+ for (long tableId : mowTableIds) {
+ setTableLastTxnId(dbId, tableId, transactionId);
+ }
- boolean ok;
- try {
- ok =
countDownLatch.await(Config.calculate_delete_bitmap_task_timeout_seconds,
TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOG.warn("InterruptedException: ", e);
- ok = false;
- }
+ for (Map.Entry<Long, List<TCalcDeleteBitmapPartitionInfo>> entry :
backendToPartitionInfos.entrySet()) {
+ CalcDeleteBitmapTask task = new
CalcDeleteBitmapTask(entry.getKey(),
+ transactionId,
+ dbId,
+ entry.getValue(),
+ signature,
+ countDownLatch);
+ countDownLatch.addMark(entry.getKey(), transactionId);
+ // add to AgentTaskQueue for handling finish report.
+ // not check return value, because the add will success
+ AgentTaskQueue.addTask(task);
+ batchTask.addTask(task);
+ LOG.info("send calculate delete bitmap task to be {}, txn_id
{}, signature {}, partitionInfos={}",
+ entry.getKey(), transactionId, signature,
entry.getValue());
+ }
+ AgentTaskExecutor.submit(batchTask);
+
+ boolean ok;
+ try {
+ ok =
countDownLatch.await(Config.calculate_delete_bitmap_task_timeout_seconds,
TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("InterruptedException: ", e);
+ ok = false;
+ }
- if (!ok || !countDownLatch.getStatus().ok()) {
- String errMsg = "Failed to calculate delete bitmap.";
- // clear tasks
- AgentTaskQueue.removeBatchTask(batchTask,
TTaskType.CALCULATE_DELETE_BITMAP);
+ if (!ok || !countDownLatch.getStatus().ok()) {
+ String errMsg = "Failed to calculate delete bitmap.";
+ // clear tasks
+ AgentTaskQueue.removeBatchTask(batchTask,
TTaskType.CALCULATE_DELETE_BITMAP);
- if (!countDownLatch.getStatus().ok()) {
- errMsg += countDownLatch.getStatus().getErrorMsg();
- if (countDownLatch.getStatus().getErrorCode() !=
TStatusCode.DELETE_BITMAP_LOCK_ERROR) {
- throw new UserException(errMsg);
+ if (!countDownLatch.getStatus().ok()) {
+ errMsg += countDownLatch.getStatus().getErrorMsg();
+ if (countDownLatch.getStatus().getErrorCode() !=
TStatusCode.DELETE_BITMAP_LOCK_ERROR) {
+ throw new UserException(errMsg);
+ }
+ } else {
+ errMsg += " Timeout.";
+ List<Entry<Long, Long>> unfinishedMarks =
countDownLatch.getLeftMarks();
+ // only show at most 3 results
+ List<Entry<Long, Long>> subList =
unfinishedMarks.subList(0,
+ Math.min(unfinishedMarks.size(), 3));
+ if (!subList.isEmpty()) {
+ errMsg += " Unfinished mark: " + Joiner.on(",
").join(subList);
+ }
}
+ LOG.warn(errMsg);
+ // DELETE_BITMAP_LOCK_ERR will be retried on be
+ throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, errMsg);
} else {
- errMsg += " Timeout.";
- List<Entry<Long, Long>> unfinishedMarks =
countDownLatch.getLeftMarks();
- // only show at most 3 results
- List<Entry<Long, Long>> subList = unfinishedMarks.subList(0,
- Math.min(unfinishedMarks.size(), 3));
- if (!subList.isEmpty()) {
- errMsg += " Unfinished mark: " + Joiner.on(",
").join(subList);
- }
+ // Sometimes BE calc delete bitmap succeed, but FE wait
timeout for some unknown reasons,
+ // FE will retry the calculation on BE, this debug point
simulates such situation.
+ debugCalcDeleteBitmapRandomTimeout();
+ }
+ res = true;
+ } finally {
+ stopWatch.stop();
+ long costTime = stopWatch.getTime();
+ if (MetricRepo.isInit) {
+
MetricRepo.HISTO_CALCULATE_DELETE_BITMAP_LATENCY.update(costTime);
+ }
+ if (commitCostTimeStatisticMap.containsKey(transactionId)) {
+
commitCostTimeStatisticMap.get(transactionId).setCalculateDeleteBitmapCostTimeMs(costTime);
+ }
+ String status = res ? "successfully" : "fail";
+ if (costTime > 1000) {
+ LOG.info("calc delete bitmap task {}. txns: {}. time cost: {}
ms.",
+ status, transactionId, stopWatch.getTime());
}
- LOG.warn(errMsg);
- // DELETE_BITMAP_LOCK_ERR will be retried on be
- throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
errMsg);
- } else {
- // Sometimes BE calc delete bitmap succeed, but FE wait timeout
for some unknown reasons,
- // FE will retry the calculation on BE, this debug point simulates
such situation.
- debugCalcDeleteBitmapRandomTimeout();
}
- stopWatch.stop();
- LOG.info("calc delete bitmap task successfully. txns: {}. time cost:
{} ms.",
- transactionId, stopWatch.getTime());
}
private void debugCalcDeleteBitmapRandomTimeout() throws UserException {
@@ -1246,9 +1293,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
TransactionState txnState = null;
boolean txnOperated = false;
try {
- txnState = commitTxn(commitTxnRequest, transactionId, false,
db.getId(),
-
subTransactionStates.stream().map(SubTransactionState::getTable)
- .collect(Collectors.toList()));
+ txnState = commitTxn(commitTxnRequest, transactionId, false);
txnOperated = true;
} finally {
if (txnState != null) {
@@ -1279,19 +1324,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
- @Override
- public void commitTransaction(DatabaseIf db, List<Table> tableList, long
transactionId,
- List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
TxnCommitAttachment txnCommitAttachment)
+ private void beforeCommitTransaction(List<Table> tableList, long
transactionId, long timeoutMillis)
throws UserException {
- // There is no publish in cloud mode
- commitAndPublishTransaction(
- db, tableList, transactionId, tabletCommitInfos,
timeoutMillis, txnCommitAttachment);
- }
-
- @Override
- public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
- List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis,
- TxnCommitAttachment
txnCommitAttachment) throws UserException {
for (int i = 0; i < tableList.size(); i++) {
long tableId = tableList.get(i).getId();
LOG.info("start commit txn=" + transactionId + ",table=" + tableId
+ ",timeoutMillis=" + timeoutMillis);
@@ -1302,6 +1336,9 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
entry.getValue().get());
}
}
+ if (!commitCostTimeStatisticMap.containsKey(transactionId)) {
+ commitCostTimeStatisticMap.put(transactionId, new
CommitCostTimeStatistic());
+ }
List<Table> tablesToLock = getTablesNeedCommitLock(tableList);
StopWatch stopWatch = null;
@@ -1342,17 +1379,68 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
if (stopWatch != null) {
stopWatch.stop();
long costTimeMs = stopWatch.getTime();
+ if (MetricRepo.isInit) {
+ MetricRepo.HISTO_GET_COMMIT_LOCK_LATENCY.update(costTimeMs);
+ }
+ if (commitCostTimeStatisticMap.containsKey(transactionId)) {
+
commitCostTimeStatisticMap.get(transactionId).setWaitCommitLockCostTimeMs(costTimeMs);
+ }
if (costTimeMs > 1000) {
LOG.warn("get table cloud commit lock, tableList=(" +
StringUtils.join(tablesToLock, ",") + ")"
+ ", transactionId=" + transactionId + ", cost=" +
costTimeMs + " ms");
}
}
+ }
+
+ private void afterCommitTransaction(List<Table> tableList, Long
transactionId) {
+ if (commitCostTimeStatisticMap.containsKey(transactionId)) {
+ commitCostTimeStatisticMap.remove(transactionId);
+ }
+ List<Table> tablesToUnlock = getTablesNeedCommitLock(tableList);
+ decreaseWaitingLockCount(tablesToUnlock);
+ MetaLockUtils.commitUnlockTables(tablesToUnlock);
+ }
+ @Override
+ public void commitTransaction(DatabaseIf db, List<Table> tableList, long
transactionId,
+ List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
TxnCommitAttachment txnCommitAttachment)
+ throws UserException {
+ // There is no publish in cloud mode
+ commitAndPublishTransaction(
+ db, tableList, transactionId, tabletCommitInfos,
timeoutMillis, txnCommitAttachment);
+ }
+
+ @Override
+ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
+ List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis,
+ TxnCommitAttachment
txnCommitAttachment) throws UserException {
+ StopWatch stopWatch = new StopWatch();
+ stopWatch.start();
+ beforeCommitTransaction(tableList, transactionId, timeoutMillis);
try {
commitTransactionWithoutLock(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
} finally {
- decreaseWaitingLockCount(tablesToLock);
- MetaLockUtils.commitUnlockTables(tablesToLock);
+ stopWatch.stop();
+ long costTimeMs = stopWatch.getTime();
+ if (costTimeMs > 1000) {
+ String detailMsg = "";
+ if (commitCostTimeStatisticMap.containsKey(transactionId)) {
+ StringBuilder sb = new StringBuilder();
+ CommitCostTimeStatistic statistic =
commitCostTimeStatisticMap.get(transactionId);
+ sb.append("get commit lock cost
").append(statistic.getWaitCommitLockCostTimeMs())
+ .append(" ms, get delete bitmap lock cost ")
+
.append(statistic.getWaitDeleteBitmapLockCostTimeMs())
+ .append(" ms, calculate delete bitmap cost ")
+
.append(statistic.getCalculateDeleteBitmapCostTimeMs()).append(" ms, commit to
ms cost ")
+
.append(statistic.getCommitToMsCostTimeMs()).append(" ms");
+ detailMsg = sb.toString();
+ }
+ LOG.info(
+ "commit transaction {} cost {} ms, detail={},
tableIds={}",
+ transactionId, costTimeMs, detailMsg,
+
tableList.stream().map(Table::getId).collect(Collectors.toList()));
+ }
+ afterCommitTransaction(tableList, transactionId);
}
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]