pengxiangyu commented on code in PR #9471:
URL: https://github.com/apache/incubator-doris/pull/9471#discussion_r867797240
##########
fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java:
##########
@@ -121,53 +123,60 @@ public void beginTxn(long batchId) throws UserException,
TException, TimeoutExce
+ "_batch" + batchId + "_" + currentTime;
String targetColumn = Joiner.on(",").join(columns) + "," +
DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr =
Catalog.getCurrentGlobalTransactionMgr();
- TransactionEntry txnEntry = txnExecutor.getTxnEntry();
- TTxnParams txnConf = txnEntry.getTxnConf();
- TransactionState.LoadJobSourceType sourceType =
TransactionState.LoadJobSourceType.INSERT_STREAMING;
- TStreamLoadPutRequest request = null;
- try {
- long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
- new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
- String authCodeUuid =
Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
- db.getId(), txnId).getAuthCode();
- request = new TStreamLoadPutRequest()
-
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
-
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
-
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
- .setColumns(targetColumn);
- txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
- txnEntry.setLabel(label);
- txnExecutor.setTxnId(txnId);
- } catch (DuplicatedRequestException e) {
- LOG.warn("duplicate request for sync channel. channel: {},
request id: {}, txn: {}, table: {}",
- id, e.getDuplicatedRequestId(), e.getTxnId(),
targetTable);
- txnExecutor.setTxnId(e.getTxnId());
- } catch (LabelAlreadyUsedException e) {
- // this happens when channel re-consume same batch, we should
just pass through it without begin a new txn
- LOG.warn("Label already used in channel {}, label: {}, table:
{}, batch: {}", id, label, targetTable, batchId);
- return;
- } catch (AnalysisException | BeginTransactionException e) {
- LOG.warn("encounter an error when beginning txn in channel {},
table: {}", id, targetTable);
- throw e;
- } catch (UserException e) {
- LOG.warn("encounter an error when creating plan in channel {},
table: {}", id, targetTable);
- throw e;
- }
- try {
- // async exec begin transaction
- long txnId = txnExecutor.getTxnId();
- if (txnId != -1L) {
- this.txnExecutor.beginTransaction(request);
- LOG.info("begin txn in channel {}, table: {}, label:{},
txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+ DatabaseTransactionMgr databaseTransactionMgr =
globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+ if(databaseTransactionMgr.getRunningTxnNums() <
Config.max_running_txn_num_per_db ) {
Review Comment:
An unnecessary space after max_running_txn_num_per_db
##########
fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java:
##########
@@ -121,53 +123,60 @@ public void beginTxn(long batchId) throws UserException,
TException, TimeoutExce
+ "_batch" + batchId + "_" + currentTime;
String targetColumn = Joiner.on(",").join(columns) + "," +
DELETE_COLUMN;
GlobalTransactionMgr globalTransactionMgr =
Catalog.getCurrentGlobalTransactionMgr();
- TransactionEntry txnEntry = txnExecutor.getTxnEntry();
- TTxnParams txnConf = txnEntry.getTxnConf();
- TransactionState.LoadJobSourceType sourceType =
TransactionState.LoadJobSourceType.INSERT_STREAMING;
- TStreamLoadPutRequest request = null;
- try {
- long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
- new
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE,
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
- String authCodeUuid =
Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
- db.getId(), txnId).getAuthCode();
- request = new TStreamLoadPutRequest()
-
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
-
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
-
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
- .setColumns(targetColumn);
- txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
- txnEntry.setLabel(label);
- txnExecutor.setTxnId(txnId);
- } catch (DuplicatedRequestException e) {
- LOG.warn("duplicate request for sync channel. channel: {},
request id: {}, txn: {}, table: {}",
- id, e.getDuplicatedRequestId(), e.getTxnId(),
targetTable);
- txnExecutor.setTxnId(e.getTxnId());
- } catch (LabelAlreadyUsedException e) {
- // this happens when channel re-consume same batch, we should
just pass through it without begin a new txn
- LOG.warn("Label already used in channel {}, label: {}, table:
{}, batch: {}", id, label, targetTable, batchId);
- return;
- } catch (AnalysisException | BeginTransactionException e) {
- LOG.warn("encounter an error when beginning txn in channel {},
table: {}", id, targetTable);
- throw e;
- } catch (UserException e) {
- LOG.warn("encounter an error when creating plan in channel {},
table: {}", id, targetTable);
- throw e;
- }
- try {
- // async exec begin transaction
- long txnId = txnExecutor.getTxnId();
- if (txnId != -1L) {
- this.txnExecutor.beginTransaction(request);
- LOG.info("begin txn in channel {}, table: {}, label:{},
txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+ DatabaseTransactionMgr databaseTransactionMgr =
globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+ if(databaseTransactionMgr.getRunningTxnNums() <
Config.max_running_txn_num_per_db ) {
+ TransactionEntry txnEntry = txnExecutor.getTxnEntry ();
+ TTxnParams txnConf = txnEntry.getTxnConf ();
+ TransactionState.LoadJobSourceType sourceType =
TransactionState.LoadJobSourceType.INSERT_STREAMING;
+ TStreamLoadPutRequest request = null;
+ try {
+ long txnId = globalTransactionMgr.beginTransaction
(db.getId (), Lists.newArrayList (tbl.getId ()), label,
+ new TransactionState.TxnCoordinator
(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress ()),
sourceType, timeoutSecond);
+ String authCodeUuid =
Catalog.getCurrentGlobalTransactionMgr ().getTransactionState (
+ db.getId (), txnId).getAuthCode ();
+ request = new TStreamLoadPutRequest ()
+ .setTxnId (txnId).setDb (txnConf.getDb ()).setTbl
(txnConf.getTbl ())
+ .setFileType (TFileType.FILE_STREAM).setFormatType
(TFileFormatType.FORMAT_CSV_PLAIN)
+ .setThriftRpcTimeoutMs (5000).setLoadId
(txnExecutor.getLoadId ())
+ .setMergeType (TMergeType.MERGE).setDeleteCondition
(DELETE_CONDITION)
+ .setColumns (targetColumn);
+ txnConf.setTxnId (txnId).setAuthCodeUuid (authCodeUuid);
+ txnEntry.setLabel (label);
+ txnExecutor.setTxnId (txnId);
+ } catch ( DuplicatedRequestException e ) {
+ LOG.warn ("duplicate request for sync channel. channel:
{}, request id: {}, txn: {}, table: {}",
+ id, e.getDuplicatedRequestId (), e.getTxnId (),
targetTable);
+ txnExecutor.setTxnId (e.getTxnId ());
+ } catch ( LabelAlreadyUsedException e ) {
+ // this happens when channel re-consume same batch, we
should just pass through it without begin a new txn
+ LOG.warn ("Label already used in channel {}, label: {},
table: {}, batch: {}", id, label, targetTable, batchId);
+ return;
+ } catch ( AnalysisException | BeginTransactionException e ) {
+ LOG.warn ("encounter an error when beginning txn in
channel {}, table: {}", id, targetTable);
+ throw e;
+ } catch ( UserException e ) {
+ LOG.warn ("encounter an error when creating plan in
channel {}, table: {}", id, targetTable);
+ throw e;
}
- } catch (TException e) {
- LOG.warn("Failed to begin txn in channel {}, table: {}, txn:
{}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
- throw e;
- } catch (TimeoutException | InterruptedException |
ExecutionException e) {
- LOG.warn("Error occur while waiting begin txn response in
channel {}, table: {}, txn: {}, msg:{}",
- id, targetTable, txnExecutor.getTxnId(),
e.getMessage());
- throw e;
+ try {
+ // async exec begin transaction
+ long txnId = txnExecutor.getTxnId ();
+ if ( txnId != - 1L ) {
+ this.txnExecutor.beginTransaction (request);
+ LOG.info ("begin txn in channel {}, table: {},
label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId ());
+ }
+ } catch ( TException e ) {
+ LOG.warn ("Failed to begin txn in channel {}, table: {},
txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId (), e.getMessage ());
+ throw e;
+ } catch ( TimeoutException | InterruptedException |
ExecutionException e ) {
+ LOG.warn ("Error occur while waiting begin txn response in
channel {}, table: {}, txn: {}, msg:{}",
+ id, targetTable, txnExecutor.getTxnId (), e.getMessage
());
+ throw e;
+ }
+ } else {
+ String failMsg = "current running txns on db " + db.getId () +
" is "
+ + databaseTransactionMgr.getRunningTxnNums () + ", larger
than limit " + Config.max_running_txn_num_per_db;
Review Comment:
Add a LOG.warn() here is better.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]