This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 8239a888a35 branch-4.0: [Fix](job) Fixed the sessionvar was not 
working in streaming jobs. #57448 (#57502)
8239a888a35 is described below

commit 8239a888a35fb4060a2e4ad70d5570ffe62c3978
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 5 09:40:14 2025 +0800

    branch-4.0: [Fix](job) Fixed the sessionvar was not working in streaming 
jobs. #57448 (#57502)
    
    Cherry-picked from #57448
    
    Co-authored-by: wudi <[email protected]>
---
 .../insert/streaming/StreamingInsertTask.java      | 15 +++---
 .../insert/streaming/StreamingJobProperties.java   | 38 +++++++++++--
 .../job/scheduler/StreamingTaskScheduler.java      |  4 +-
 .../streaming/StreamingJobPropertiesTest.java      | 62 ++++++++++++++++++++++
 4 files changed, 104 insertions(+), 15 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 68f40be923c..ec3c6407be7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -61,7 +61,7 @@ public class StreamingInsertTask {
     private long taskId;
     private String labelName;
     @Setter
-    private TaskStatus status;
+    private volatile TaskStatus status;
     private String errMsg;
     private Long createTimeMs;
     private Long startTimeMs;
@@ -132,15 +132,14 @@ public class StreamingInsertTask {
     }
 
     private void before() throws Exception {
-        this.status = TaskStatus.RUNNING;
-        this.startTimeMs = System.currentTimeMillis();
-
         if (isCanceled.get()) {
             log.info("streaming insert task has been canceled, task id is {}", 
getTaskId());
             return;
         }
+        this.status = TaskStatus.RUNNING;
+        this.startTimeMs = System.currentTimeMillis();
         ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
-        ctx.setSessionVariable(jobProperties.getSessionVariable());
+        
ctx.setSessionVariable(jobProperties.getSessionVariable(ctx.getSessionVariable()));
         StatementContext statementContext = new StatementContext();
         ctx.setStatementContext(statementContext);
 
@@ -181,7 +180,7 @@ public class StreamingInsertTask {
     }
 
     public boolean onSuccess() throws JobException {
-        if (TaskStatus.CANCELED.equals(status)) {
+        if (isCanceled.get()) {
             return false;
         }
         this.status = TaskStatus.SUCCESS;
@@ -201,10 +200,10 @@ public class StreamingInsertTask {
     }
 
     public void onFail(String errMsg) throws JobException {
-        this.errMsg = errMsg;
-        if (TaskStatus.CANCELED.equals(status)) {
+        if (isCanceled.get()) {
             return;
         }
+        this.errMsg = errMsg;
         this.status = TaskStatus.FAILED;
         this.finishTimeMs = System.currentTimeMillis();
         if (!isCallable()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 0f20dbd4c1e..58e236e6720 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.util.Util;
 import org.apache.doris.job.base.JobProperties;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.VariableMgr;
 
 import lombok.Data;
 import org.json.simple.JSONObject;
@@ -47,7 +48,8 @@ public class StreamingJobProperties implements JobProperties {
     public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
     public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
     public static final long DEFAULT_MAX_S3_BATCH_BYTES = 10 * 1024 * 1024 * 
1024L; // 10GB
-    public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
+    public static final int DEFAULT_JOB_INSERT_TIMEOUT = 30 * 60; // 30min
+    public static final int DEFAULT_JOB_QUERY_TIMEOUT = 30 * 60; // 30min
 
 
     private final Map<String, String> properties;
@@ -117,13 +119,28 @@ public class StreamingJobProperties implements 
JobProperties {
         }
     }
 
-    public SessionVariable getSessionVariable() throws JobException {
-        SessionVariable sessionVariable = new SessionVariable();
+    public SessionVariable getSessionVariable(SessionVariable sessionVariable) 
throws JobException {
+        int defaultInsert = parseIntOrDefault(
+                VariableMgr.getDefaultValue(SessionVariable.INSERT_TIMEOUT),
+                DEFAULT_JOB_INSERT_TIMEOUT
+        );
+        int defaultQuery = parseIntOrDefault(
+                VariableMgr.getDefaultValue(SessionVariable.QUERY_TIMEOUT),
+                DEFAULT_JOB_QUERY_TIMEOUT
+        );
+        // override with job default session var
+        if (sessionVariable.getInsertTimeoutS() == defaultInsert) {
+            sessionVariable.setInsertTimeoutS(DEFAULT_JOB_INSERT_TIMEOUT);
+        }
+
+        if (sessionVariable.getQueryTimeoutS() == defaultQuery) {
+            sessionVariable.setQueryTimeoutS(DEFAULT_JOB_QUERY_TIMEOUT);
+        }
+
         Map<String, String> sessionVarMap = parseSessionVarMap();
         if (!sessionVarMap.isEmpty()) {
             try {
-                sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
-                sessionVariable.setQueryTimeoutS(DEFAULT_INSERT_TIMEOUT);
+                // override session var for sessionVarMap
                 sessionVariable.readFromMap(sessionVarMap);
             } catch (Exception e) {
                 throw new JobException("Invalid session variable, " + 
e.getMessage());
@@ -132,6 +149,17 @@ public class StreamingJobProperties implements 
JobProperties {
         return sessionVariable;
     }
 
+    private int parseIntOrDefault(String val, int defaultValue) {
+        if (val == null) {
+            return defaultValue;
+        }
+        try {
+            return Integer.parseInt(val);
+        } catch (NumberFormatException e) {
+            return defaultValue;
+        }
+    }
+
     private Map<String, String> parseSessionVarMap() {
         final Map<String, String> sessionVarMap = new HashMap<>();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 27a15fb959e..51e17f214c6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -110,8 +110,8 @@ public class StreamingTaskScheduler extends MasterDaemon {
 
         // reject invalid task
         if (!job.needScheduleTask()) {
-            log.info("do not need to schedule invalid task, task id: {}, job 
id: {}",
-                        task.getTaskId(), task.getJobId());
+            log.info("do not need to schedule invalid task, task id: {}, job 
id: {}, job status: {}",
+                        task.getTaskId(), task.getJobId(), job.getJobStatus());
             return;
         }
         // reject task if no more data to consume
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobPropertiesTest.java
new file mode 100644
index 00000000000..328059aaf1f
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobPropertiesTest.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertTask;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class StreamingJobPropertiesTest {
+
+    @Test
+    public void testSessionVariables() throws JobException {
+        //default
+        StreamingJobProperties jobProperties = new StreamingJobProperties(new 
HashMap<>());
+        ConnectContext ctx = InsertTask.makeConnectContext(null, null);
+        SessionVariable defaultSessionVar = 
jobProperties.getSessionVariable(ctx.getSessionVariable());
+        Assert.assertEquals(StreamingJobProperties.DEFAULT_JOB_INSERT_TIMEOUT, 
defaultSessionVar.getInsertTimeoutS());
+        Assert.assertEquals(StreamingJobProperties.DEFAULT_JOB_QUERY_TIMEOUT, 
defaultSessionVar.getQueryTimeoutS());
+
+        // set session var
+        ctx = InsertTask.makeConnectContext(null, null);
+        SessionVariable userSessionVar = new SessionVariable();
+        userSessionVar.setInsertTimeoutS(1);
+        userSessionVar.setQueryTimeoutS(2);
+        ctx.setSessionVariable(userSessionVar);
+
+        SessionVariable userSessionVarRes = 
jobProperties.getSessionVariable(ctx.getSessionVariable());
+        Assert.assertEquals(1, userSessionVarRes.getInsertTimeoutS());
+        Assert.assertEquals(2, userSessionVarRes.getQueryTimeoutS());
+
+        // set session map in job properties
+        ctx = InsertTask.makeConnectContext(null, null);
+        HashMap<String, String> props = new HashMap<>();
+        props.put("session.insert_timeout", "10");
+        props.put("session.query_timeout", "20");
+        StreamingJobProperties jobPropertiesMap = new 
StreamingJobProperties(props);
+        SessionVariable sessionVarMap = 
jobPropertiesMap.getSessionVariable(ctx.getSessionVariable());
+        Assert.assertEquals(10, sessionVarMap.getInsertTimeoutS());
+        Assert.assertEquals(20, sessionVarMap.getQueryTimeoutS());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to