This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b111f9a518 [fix](insert) Session varaiables dont work for transaction
insert (#17551)
b111f9a518 is described below
commit b111f9a5186351c0c6133ff8d4faf32dffe7a22b
Author: Yusheng Xu <[email protected]>
AuthorDate: Sun Mar 19 10:43:02 2023 +0800
[fix](insert) Session varaiables dont work for transaction insert (#17551)
---
.../src/main/java/org/apache/doris/qe/StmtExecutor.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 964952ed04..8f9e060e4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1498,7 +1498,9 @@ public class StmtExecutor implements ProfileWriter {
InterruptedException, ExecutionException, TimeoutException {
TransactionEntry txnEntry = context.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
+ SessionVariable sessionVariable =
ConnectContext.get().getSessionVariable();
long timeoutSecond = ConnectContext.get().getExecTimeout();
+
TransactionState.LoadJobSourceType sourceType =
TransactionState.LoadJobSourceType.INSERT_STREAMING;
Database dbObj = Env.getCurrentInternalCatalog()
.getDbOrException(dbName, s -> new TException("database is
invalid for dbName: " + s));
@@ -1528,10 +1530,17 @@ public class StmtExecutor implements ProfileWriter {
}
TStreamLoadPutRequest request = new TStreamLoadPutRequest();
+
+ long maxExecMemByte = sessionVariable.getMaxExecMemByte();
+ String timeZone = sessionVariable.getTimeZone();
+ int sendBatchParallelism = sessionVariable.getSendBatchParallelism();
+
request.setTxnId(txnConf.getTxnId()).setDb(txnConf.getDb())
.setTbl(txnConf.getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId());
+
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId())
+ .setExecMemLimit(maxExecMemByte).setTimeout((int)
timeoutSecond)
+
.setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism);
// execute begin txn
InsertStreamTxnExecutor executor = new
InsertStreamTxnExecutor(txnEntry);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]