pengxiangyu commented on code in PR #9471:
URL: https://github.com/apache/incubator-doris/pull/9471#discussion_r867797240


##########
fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java:
##########
@@ -121,53 +123,60 @@ public void beginTxn(long batchId) throws UserException, 
TException, TimeoutExce
                     + "_batch" + batchId + "_" + currentTime;
             String targetColumn = Joiner.on(",").join(columns) + "," + 
DELETE_COLUMN;
             GlobalTransactionMgr globalTransactionMgr = 
Catalog.getCurrentGlobalTransactionMgr();
-            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-            TTxnParams txnConf = txnEntry.getTxnConf();
-            TransactionState.LoadJobSourceType sourceType = 
TransactionState.LoadJobSourceType.INSERT_STREAMING;
-            TStreamLoadPutRequest request = null;
-            try {
-                long txnId = globalTransactionMgr.beginTransaction(db.getId(), 
Lists.newArrayList(tbl.getId()), label,
-                        new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
-                String authCodeUuid = 
Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
-                        db.getId(), txnId).getAuthCode();
-                request = new TStreamLoadPutRequest()
-                        
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
-                        
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                        
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
-                        
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
-                        .setColumns(targetColumn);
-                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
-                txnEntry.setLabel(label);
-                txnExecutor.setTxnId(txnId);
-            } catch (DuplicatedRequestException e) {
-                LOG.warn("duplicate request for sync channel. channel: {}, 
request id: {}, txn: {}, table: {}",
-                        id, e.getDuplicatedRequestId(), e.getTxnId(), 
targetTable);
-                txnExecutor.setTxnId(e.getTxnId());
-            } catch (LabelAlreadyUsedException e) {
-                // this happens when channel re-consume same batch, we should 
just pass through it without begin a new txn
-                LOG.warn("Label already used in channel {}, label: {}, table: 
{}, batch: {}", id, label, targetTable, batchId);
-                return;
-            } catch (AnalysisException | BeginTransactionException e) {
-                LOG.warn("encounter an error when beginning txn in channel {}, 
table: {}", id, targetTable);
-                throw e;
-            } catch (UserException e) {
-                LOG.warn("encounter an error when creating plan in channel {}, 
table: {}", id, targetTable);
-                throw e;
-            }
-            try {
-                // async exec begin transaction
-                long txnId = txnExecutor.getTxnId();
-                if (txnId != -1L) {
-                    this.txnExecutor.beginTransaction(request);
-                    LOG.info("begin txn in channel {}, table: {}, label:{}, 
txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+            DatabaseTransactionMgr databaseTransactionMgr = 
globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+            if(databaseTransactionMgr.getRunningTxnNums() < 
Config.max_running_txn_num_per_db ) {

Review Comment:
   An unnecessary space after max_running_txn_num_per_db



##########
fe/fe-core/src/main/java/org/apache/doris/load/sync/canal/CanalSyncChannel.java:
##########
@@ -121,53 +123,60 @@ public void beginTxn(long batchId) throws UserException, 
TException, TimeoutExce
                     + "_batch" + batchId + "_" + currentTime;
             String targetColumn = Joiner.on(",").join(columns) + "," + 
DELETE_COLUMN;
             GlobalTransactionMgr globalTransactionMgr = 
Catalog.getCurrentGlobalTransactionMgr();
-            TransactionEntry txnEntry = txnExecutor.getTxnEntry();
-            TTxnParams txnConf = txnEntry.getTxnConf();
-            TransactionState.LoadJobSourceType sourceType = 
TransactionState.LoadJobSourceType.INSERT_STREAMING;
-            TStreamLoadPutRequest request = null;
-            try {
-                long txnId = globalTransactionMgr.beginTransaction(db.getId(), 
Lists.newArrayList(tbl.getId()), label,
-                        new 
TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 
FrontendOptions.getLocalHostAddress()), sourceType, timeoutSecond);
-                String authCodeUuid = 
Catalog.getCurrentGlobalTransactionMgr().getTransactionState(
-                        db.getId(), txnId).getAuthCode();
-                request = new TStreamLoadPutRequest()
-                        
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
-                        
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                        
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
-                        
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
-                        .setColumns(targetColumn);
-                txnConf.setTxnId(txnId).setAuthCodeUuid(authCodeUuid);
-                txnEntry.setLabel(label);
-                txnExecutor.setTxnId(txnId);
-            } catch (DuplicatedRequestException e) {
-                LOG.warn("duplicate request for sync channel. channel: {}, 
request id: {}, txn: {}, table: {}",
-                        id, e.getDuplicatedRequestId(), e.getTxnId(), 
targetTable);
-                txnExecutor.setTxnId(e.getTxnId());
-            } catch (LabelAlreadyUsedException e) {
-                // this happens when channel re-consume same batch, we should 
just pass through it without begin a new txn
-                LOG.warn("Label already used in channel {}, label: {}, table: 
{}, batch: {}", id, label, targetTable, batchId);
-                return;
-            } catch (AnalysisException | BeginTransactionException e) {
-                LOG.warn("encounter an error when beginning txn in channel {}, 
table: {}", id, targetTable);
-                throw e;
-            } catch (UserException e) {
-                LOG.warn("encounter an error when creating plan in channel {}, 
table: {}", id, targetTable);
-                throw e;
-            }
-            try {
-                // async exec begin transaction
-                long txnId = txnExecutor.getTxnId();
-                if (txnId != -1L) {
-                    this.txnExecutor.beginTransaction(request);
-                    LOG.info("begin txn in channel {}, table: {}, label:{}, 
txn id: {}", id, targetTable, label, txnExecutor.getTxnId());
+            DatabaseTransactionMgr databaseTransactionMgr = 
globalTransactionMgr.getDatabaseTransactionMgr(db.getId());
+            if(databaseTransactionMgr.getRunningTxnNums() < 
Config.max_running_txn_num_per_db ) {
+                TransactionEntry txnEntry = txnExecutor.getTxnEntry ();
+                TTxnParams txnConf = txnEntry.getTxnConf ();
+                TransactionState.LoadJobSourceType sourceType = 
TransactionState.LoadJobSourceType.INSERT_STREAMING;
+                TStreamLoadPutRequest request = null;
+                try {
+                    long txnId = globalTransactionMgr.beginTransaction 
(db.getId (), Lists.newArrayList (tbl.getId ()), label,
+                        new TransactionState.TxnCoordinator 
(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress ()), 
sourceType, timeoutSecond);
+                    String authCodeUuid = 
Catalog.getCurrentGlobalTransactionMgr ().getTransactionState (
+                        db.getId (), txnId).getAuthCode ();
+                    request = new TStreamLoadPutRequest ()
+                        .setTxnId (txnId).setDb (txnConf.getDb ()).setTbl 
(txnConf.getTbl ())
+                        .setFileType (TFileType.FILE_STREAM).setFormatType 
(TFileFormatType.FORMAT_CSV_PLAIN)
+                        .setThriftRpcTimeoutMs (5000).setLoadId 
(txnExecutor.getLoadId ())
+                        .setMergeType (TMergeType.MERGE).setDeleteCondition 
(DELETE_CONDITION)
+                        .setColumns (targetColumn);
+                    txnConf.setTxnId (txnId).setAuthCodeUuid (authCodeUuid);
+                    txnEntry.setLabel (label);
+                    txnExecutor.setTxnId (txnId);
+                } catch ( DuplicatedRequestException e ) {
+                    LOG.warn ("duplicate request for sync channel. channel: 
{}, request id: {}, txn: {}, table: {}",
+                        id, e.getDuplicatedRequestId (), e.getTxnId (), 
targetTable);
+                    txnExecutor.setTxnId (e.getTxnId ());
+                } catch ( LabelAlreadyUsedException e ) {
+                    // this happens when channel re-consume same batch, we 
should just pass through it without begin a new txn
+                    LOG.warn ("Label already used in channel {}, label: {}, 
table: {}, batch: {}", id, label, targetTable, batchId);
+                    return;
+                } catch ( AnalysisException | BeginTransactionException e ) {
+                    LOG.warn ("encounter an error when beginning txn in 
channel {}, table: {}", id, targetTable);
+                    throw e;
+                } catch ( UserException e ) {
+                    LOG.warn ("encounter an error when creating plan in 
channel {}, table: {}", id, targetTable);
+                    throw e;
                 }
-            } catch (TException e) {
-                LOG.warn("Failed to begin txn in channel {}, table: {}, txn: 
{}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
-                throw e;
-            } catch (TimeoutException | InterruptedException | 
ExecutionException e) {
-                LOG.warn("Error occur while waiting begin txn response in 
channel {}, table: {}, txn: {}, msg:{}",
-                        id, targetTable, txnExecutor.getTxnId(), 
e.getMessage());
-                throw e;
+                try {
+                    // async exec begin transaction
+                    long txnId = txnExecutor.getTxnId ();
+                    if ( txnId != - 1L ) {
+                        this.txnExecutor.beginTransaction (request);
+                        LOG.info ("begin txn in channel {}, table: {}, 
label:{}, txn id: {}", id, targetTable, label, txnExecutor.getTxnId ());
+                    }
+                } catch ( TException e ) {
+                    LOG.warn ("Failed to begin txn in channel {}, table: {}, 
txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId (), e.getMessage ());
+                    throw e;
+                } catch ( TimeoutException | InterruptedException | 
ExecutionException e ) {
+                    LOG.warn ("Error occur while waiting begin txn response in 
channel {}, table: {}, txn: {}, msg:{}",
+                        id, targetTable, txnExecutor.getTxnId (), e.getMessage 
());
+                    throw e;
+                }
+            } else {
+                String failMsg = "current running txns on db " + db.getId () + 
" is "
+                    + databaseTransactionMgr.getRunningTxnNums () + ", larger 
than limit " + Config.max_running_txn_num_per_db;

Review Comment:
   Add a LOG.warn() here is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to