This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new d6757e03de4 [fix](group commit) Group commit http stream should not
begin txn (#35494) (#35672)
d6757e03de4 is described below
commit d6757e03de4605254175b0b8e2fa2b6646f152cb
Author: meiyi <[email protected]>
AuthorDate: Thu May 30 20:57:59 2024 +0800
[fix](group commit) Group commit http stream should not begin txn (#35494)
(#35672)
## Proposed changes
Pick https://github.com/apache/doris/pull/35494
---
.../plans/commands/insert/OlapInsertExecutor.java | 24 +++++++++++++++-------
1 file changed, 17 insertions(+), 7 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
index 3973fae4d4c..579e04b8e08 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java
@@ -86,6 +86,10 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
@Override
public void beginTransaction() {
+ if (isGroupCommitHttpStream()) {
+ LOG.info("skip begin transaction for group commit http stream");
+ return;
+ }
try {
if (ctx.isTxnModel()) {
TransactionEntry txnEntry = ctx.getTxnEntry();
@@ -155,13 +159,15 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
- TransactionState state =
Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(),
txnId);
- if (state == null) {
- throw new AnalysisException("txn does not exist: " + txnId);
- }
- state.addTableIndexes((OlapTable) table);
- if (physicalOlapTableSink.isPartialUpdate()) {
- state.setSchemaForPartialUpdate((OlapTable) table);
+ if (!isGroupCommitHttpStream()) {
+ TransactionState state =
Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(),
txnId);
+ if (state == null) {
+ throw new AnalysisException("txn does not exist: " + txnId);
+ }
+ state.addTableIndexes((OlapTable) table);
+ if (physicalOlapTableSink.isPartialUpdate()) {
+ state.setSchemaForPartialUpdate((OlapTable) table);
+ }
}
}
@@ -279,4 +285,8 @@ public class OlapInsertExecutor extends
AbstractInsertExecutor {
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}
+
+ private boolean isGroupCommitHttpStream() {
+ return ConnectContext.get() != null &&
ConnectContext.get().isGroupCommitStreamLoadSql();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]