This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b111f9a518 [fix](insert) Session varaiables dont work for transaction 
insert (#17551)
b111f9a518 is described below

commit b111f9a5186351c0c6133ff8d4faf32dffe7a22b
Author: Yusheng Xu <[email protected]>
AuthorDate: Sun Mar 19 10:43:02 2023 +0800

    [fix](insert) Session varaiables dont work for transaction insert (#17551)
---
 .../src/main/java/org/apache/doris/qe/StmtExecutor.java       | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 964952ed04..8f9e060e4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -1498,7 +1498,9 @@ public class StmtExecutor implements ProfileWriter {
             InterruptedException, ExecutionException, TimeoutException {
         TransactionEntry txnEntry = context.getTxnEntry();
         TTxnParams txnConf = txnEntry.getTxnConf();
+        SessionVariable sessionVariable = 
ConnectContext.get().getSessionVariable();
         long timeoutSecond = ConnectContext.get().getExecTimeout();
+
         TransactionState.LoadJobSourceType sourceType = 
TransactionState.LoadJobSourceType.INSERT_STREAMING;
         Database dbObj = Env.getCurrentInternalCatalog()
                 .getDbOrException(dbName, s -> new TException("database is 
invalid for dbName: " + s));
@@ -1528,10 +1530,17 @@ public class StmtExecutor implements ProfileWriter {
         }
 
         TStreamLoadPutRequest request = new TStreamLoadPutRequest();
+
+        long maxExecMemByte = sessionVariable.getMaxExecMemByte();
+        String timeZone = sessionVariable.getTimeZone();
+        int sendBatchParallelism = sessionVariable.getSendBatchParallelism();
+
         request.setTxnId(txnConf.getTxnId()).setDb(txnConf.getDb())
                 .setTbl(txnConf.getTbl())
                 
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
-                
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId());
+                
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(context.queryId())
+                .setExecMemLimit(maxExecMemByte).setTimeout((int) 
timeoutSecond)
+                
.setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism);
 
         // execute begin txn
         InsertStreamTxnExecutor executor = new 
InsertStreamTxnExecutor(txnEntry);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to