This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d7705ace65 [fix](binlog-load) binlog load fails because txn exceeds
the default value (#9471)
d7705ace65 is described below
commit d7705ace659c297754a1949a15a8950ee208724a
Author: jiafeng.zhang <[email protected]>
AuthorDate: Thu May 12 13:31:22 2022 +0800
[fix](binlog-load) binlog load fails because txn exceeds the default value
(#9471)
binlog load Because txn exceeds the default value, resume is a failure,
and a friendly prompt message is given to the user, instead of prompting
success now,
it still fails after a while, and the user will feel inexplicable
Issue Number: close #9468
---
.../doris/load/sync/canal/CanalSyncChannel.java | 94 +++++++++++++---------
.../doris/transaction/DatabaseTransactionMgr.java | 2 +-
2 files changed, 57 insertions(+), 39 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
index 2b71619dcf..5d0774b54a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
@@ -41,6 +42,7 @@ import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.DatabaseTransactionMgr;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
@@ -121,53 +123,69 @@ public class CanalSyncChannel extends SyncChannel {
+ "_batch" + batchId + "_" + currentTime;
String targetColumn = Joiner.on(",").join(columns) + "," +
DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr =
Catalog.getCurrentGlobalTransactionMgr();
- TransactionEntry txnEntry = txnExecutor.getTxnEntry();
- TTxnParams txnConf = txnEntry.getTxnConf();
- TransactionState.LoadJobSourceType sourceType =
TransactionState.LoadJobSourceType.INSERT_STREAMING;
- TStreamLoadPutRequest request = null;
- try {
- long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
- new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
- String authCodeUuid =
Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
+ DatabaseTransactionMgr databaseTransactionMgr =
globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+ if (databaseTransactionMgr.getRunningTxnNums() <
Config.max_running_txn_num_per_db) {
+ TransactionEntry txnEntry = txnExecutor.getTxnEntry();
+ TTxnParams txnConf = txnEntry.getTxnConf();
+ TransactionState.LoadJobSourceType sourceType =
TransactionState.LoadJobSourceType.INSERT_STREAMING;
+ TStreamLoadPutRequest request = null;
+ try {
+ long txnId =
globalTransactionMgr.beginTransaction(db.getId(),
+ Lists.newArrayList(tbl.getId()), label,
+ new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
+ FrontendOptions.getLocalHostAddress()),
sourceType, timeoutSecond);
+ String authCodeUuid =
Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
db.getId(), txnId).getAuthCode();
- request = new TStreamLoadPutRequest()
+ request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
.setColumns(targetColumn);
- txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
- txnEntry.setLabel(label);
- txnExecutor.setTxnId(txnId);
- } catch (DuplicatedRequestException e) {
- LOG.warn("duplicate request for sync channel. channel: {},
request id: {}, txn: {}, table: {}",
+ txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
+ txnEntry.setLabel(label);
+ txnExecutor.setTxnId (txnId);
+ } catch (DuplicatedRequestException e) {
+ LOG.warn ("duplicate request for sync channel. channel:
{}, request id: {}, txn: {}, table: {}",
id, e.getDuplicatedRequestId(), e.getTxnId(),
targetTable);
- txnExecutor.setTxnId(e.getTxnId());
- } catch (LabelAlreadyUsedException e) {
- // this happens when channel re-consume same batch, we should
just pass through it without begin a new txn
- LOG.warn("Label already used in channel {}, label: {}, table:
{}, batch: {}", id, label, targetTable, batchId);
- return;
- } catch (AnalysisException | BeginTransactionException e) {
- LOG.warn("encounter an error when beginning txn in channel {},
table: {}", id, targetTable);
- throw e;
- } catch (UserException e) {
- LOG.warn("encounter an error when creating plan in channel {},
table: {}", id, targetTable);
- throw e;
- }
- try {
- // async exec begin transaction
- long txnId = txnExecutor.getTxnId();
- if (txnId != -1L) {
- this.txnExecutor.beginTransaction(request);
- LOG.info("begin txn in channel {}, table: {}, label:{},
txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+ txnExecutor.setTxnId(e.getTxnId());
+ } catch (LabelAlreadyUsedException e) {
+ // this happens when channel re-consume same batch,
+ // we should just pass through it without begin a new txn
+ LOG.warn ("Label already used in channel {}, label: {},
table: {}, batch: {}",
+ id, label, targetTable, batchId);
+ return;
+ } catch (AnalysisException | BeginTransactionException e) {
+ LOG.warn ("encounter an error when beginning txn in
channel {}, table: {}",
+ id, targetTable);
+ throw e;
+ } catch (UserException e) {
+ LOG.warn ("encounter an error when creating plan in
channel {}, table: {}",
+ id, targetTable);
+ throw e;
}
- } catch (TException e) {
- LOG.warn("Failed to begin txn in channel {}, table: {}, txn:
{}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
- throw e;
- } catch (TimeoutException | InterruptedException |
ExecutionException e) {
- LOG.warn("Error occur while waiting begin txn response in
channel {}, table: {}, txn: {}, msg:{}",
+ try {
+ // async exec begin transaction
+ long txnId = txnExecutor.getTxnId();
+ if ( txnId != - 1L ) {
+ this.txnExecutor.beginTransaction (request);
+ LOG.info ("begin txn in channel {}, table: {},
label:{}, txn id: {}",
+ id, targetTable, label, txnExecutor.getTxnId());
+ }
+ } catch ( TException e) {
+ LOG.warn ("Failed to begin txn in channel {}, table: {},
txn: {}, msg:{}",
+ id, targetTable, txnExecutor.getTxnId(),
e.getMessage());
+ throw e;
+ } catch ( TimeoutException | InterruptedException |
ExecutionException e) {
+ LOG.warn ("Error occur while waiting begin txn response in
channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(),
e.getMessage());
- throw e;
+ throw e;
+ }
+ } else {
+ String failMsg = "current running txns on db " + db.getId() +
" is "
+ + databaseTransactionMgr.getRunningTxnNums() + ", larger
than limit " + Config.max_running_txn_num_per_db;
+ LOG.warn(failMsg);
+ throw new BeginTransactionException(failMsg);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 9ce906fb79..bf530c5ef4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -195,7 +195,7 @@ public class DatabaseTransactionMgr {
return labelToTxnIds.get(label);
}
- protected int getRunningTxnNums() {
+ public int getRunningTxnNums() {
return runningTxnNums;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]