This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 8239a888a35 branch-4.0: [Fix](job) Fixed the sessionvar was not
working in streaming jobs. #57448 (#57502)
8239a888a35 is described below
commit 8239a888a35fb4060a2e4ad70d5570ffe62c3978
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 5 09:40:14 2025 +0800
branch-4.0: [Fix](job) Fixed the sessionvar was not working in streaming
jobs. #57448 (#57502)
Cherry-picked from #57448
Co-authored-by: wudi <[email protected]>
---
.../insert/streaming/StreamingInsertTask.java | 15 +++---
.../insert/streaming/StreamingJobProperties.java | 38 +++++++++++--
.../job/scheduler/StreamingTaskScheduler.java | 4 +-
.../streaming/StreamingJobPropertiesTest.java | 62 ++++++++++++++++++++++
4 files changed, 104 insertions(+), 15 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 68f40be923c..ec3c6407be7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -61,7 +61,7 @@ public class StreamingInsertTask {
private long taskId;
private String labelName;
@Setter
- private TaskStatus status;
+ private volatile TaskStatus status;
private String errMsg;
private Long createTimeMs;
private Long startTimeMs;
@@ -132,15 +132,14 @@ public class StreamingInsertTask {
}
private void before() throws Exception {
- this.status = TaskStatus.RUNNING;
- this.startTimeMs = System.currentTimeMillis();
-
if (isCanceled.get()) {
log.info("streaming insert task has been canceled, task id is {}",
getTaskId());
return;
}
+ this.status = TaskStatus.RUNNING;
+ this.startTimeMs = System.currentTimeMillis();
ctx = InsertTask.makeConnectContext(userIdentity, currentDb);
- ctx.setSessionVariable(jobProperties.getSessionVariable());
+
ctx.setSessionVariable(jobProperties.getSessionVariable(ctx.getSessionVariable()));
StatementContext statementContext = new StatementContext();
ctx.setStatementContext(statementContext);
@@ -181,7 +180,7 @@ public class StreamingInsertTask {
}
public boolean onSuccess() throws JobException {
- if (TaskStatus.CANCELED.equals(status)) {
+ if (isCanceled.get()) {
return false;
}
this.status = TaskStatus.SUCCESS;
@@ -201,10 +200,10 @@ public class StreamingInsertTask {
}
public void onFail(String errMsg) throws JobException {
- this.errMsg = errMsg;
- if (TaskStatus.CANCELED.equals(status)) {
+ if (isCanceled.get()) {
return;
}
+ this.errMsg = errMsg;
this.status = TaskStatus.FAILED;
this.finishTimeMs = System.currentTimeMillis();
if (!isCallable()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 0f20dbd4c1e..58e236e6720 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -22,6 +22,7 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.job.base.JobProperties;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.VariableMgr;
import lombok.Data;
import org.json.simple.JSONObject;
@@ -47,7 +48,8 @@ public class StreamingJobProperties implements JobProperties {
public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
public static final long DEFAULT_MAX_S3_BATCH_BYTES = 10 * 1024 * 1024 *
1024L; // 10GB
- public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
+ public static final int DEFAULT_JOB_INSERT_TIMEOUT = 30 * 60; // 30min
+ public static final int DEFAULT_JOB_QUERY_TIMEOUT = 30 * 60; // 30min
private final Map<String, String> properties;
@@ -117,13 +119,28 @@ public class StreamingJobProperties implements
JobProperties {
}
}
- public SessionVariable getSessionVariable() throws JobException {
- SessionVariable sessionVariable = new SessionVariable();
+ public SessionVariable getSessionVariable(SessionVariable sessionVariable)
throws JobException {
+ int defaultInsert = parseIntOrDefault(
+ VariableMgr.getDefaultValue(SessionVariable.INSERT_TIMEOUT),
+ DEFAULT_JOB_INSERT_TIMEOUT
+ );
+ int defaultQuery = parseIntOrDefault(
+ VariableMgr.getDefaultValue(SessionVariable.QUERY_TIMEOUT),
+ DEFAULT_JOB_QUERY_TIMEOUT
+ );
+ // override with job default session var
+ if (sessionVariable.getInsertTimeoutS() == defaultInsert) {
+ sessionVariable.setInsertTimeoutS(DEFAULT_JOB_INSERT_TIMEOUT);
+ }
+
+ if (sessionVariable.getQueryTimeoutS() == defaultQuery) {
+ sessionVariable.setQueryTimeoutS(DEFAULT_JOB_QUERY_TIMEOUT);
+ }
+
Map<String, String> sessionVarMap = parseSessionVarMap();
if (!sessionVarMap.isEmpty()) {
try {
- sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
- sessionVariable.setQueryTimeoutS(DEFAULT_INSERT_TIMEOUT);
+ // override session var for sessionVarMap
sessionVariable.readFromMap(sessionVarMap);
} catch (Exception e) {
throw new JobException("Invalid session variable, " +
e.getMessage());
@@ -132,6 +149,17 @@ public class StreamingJobProperties implements
JobProperties {
return sessionVariable;
}
+ private int parseIntOrDefault(String val, int defaultValue) {
+ if (val == null) {
+ return defaultValue;
+ }
+ try {
+ return Integer.parseInt(val);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
private Map<String, String> parseSessionVarMap() {
final Map<String, String> sessionVarMap = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
index 27a15fb959e..51e17f214c6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java
@@ -110,8 +110,8 @@ public class StreamingTaskScheduler extends MasterDaemon {
// reject invalid task
if (!job.needScheduleTask()) {
- log.info("do not need to schedule invalid task, task id: {}, job
id: {}",
- task.getTaskId(), task.getJobId());
+ log.info("do not need to schedule invalid task, task id: {}, job
id: {}, job status: {}",
+ task.getTaskId(), task.getJobId(), job.getJobStatus());
return;
}
// reject task if no more data to consume
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobPropertiesTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobPropertiesTest.java
new file mode 100644
index 00000000000..328059aaf1f
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobPropertiesTest.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.job.extensions.insert.streaming;
+
+import org.apache.doris.job.exception.JobException;
+import org.apache.doris.job.extensions.insert.InsertTask;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class StreamingJobPropertiesTest {
+
+ @Test
+ public void testSessionVariables() throws JobException {
+ //default
+ StreamingJobProperties jobProperties = new StreamingJobProperties(new
HashMap<>());
+ ConnectContext ctx = InsertTask.makeConnectContext(null, null);
+ SessionVariable defaultSessionVar =
jobProperties.getSessionVariable(ctx.getSessionVariable());
+ Assert.assertEquals(StreamingJobProperties.DEFAULT_JOB_INSERT_TIMEOUT,
defaultSessionVar.getInsertTimeoutS());
+ Assert.assertEquals(StreamingJobProperties.DEFAULT_JOB_QUERY_TIMEOUT,
defaultSessionVar.getQueryTimeoutS());
+
+ // set session var
+ ctx = InsertTask.makeConnectContext(null, null);
+ SessionVariable userSessionVar = new SessionVariable();
+ userSessionVar.setInsertTimeoutS(1);
+ userSessionVar.setQueryTimeoutS(2);
+ ctx.setSessionVariable(userSessionVar);
+
+ SessionVariable userSessionVarRes =
jobProperties.getSessionVariable(ctx.getSessionVariable());
+ Assert.assertEquals(1, userSessionVarRes.getInsertTimeoutS());
+ Assert.assertEquals(2, userSessionVarRes.getQueryTimeoutS());
+
+ // set session map in job properties
+ ctx = InsertTask.makeConnectContext(null, null);
+ HashMap<String, String> props = new HashMap<>();
+ props.put("session.insert_timeout", "10");
+ props.put("session.query_timeout", "20");
+ StreamingJobProperties jobPropertiesMap = new
StreamingJobProperties(props);
+ SessionVariable sessionVarMap =
jobPropertiesMap.getSessionVariable(ctx.getSessionVariable());
+ Assert.assertEquals(10, sessionVarMap.getInsertTimeoutS());
+ Assert.assertEquals(20, sessionVarMap.getQueryTimeoutS());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]