This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 9d7092f919e branch-4.0: [Improve](fix) add streamjob case with create
alter select delete and resume #56849 (#56926)
9d7092f919e is described below
commit 9d7092f919e44d3a0ff34994a1f2204ec7210671
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 14 18:09:51 2025 +0800
branch-4.0: [Improve](fix) add streamjob case with create alter select
delete and resume #56849 (#56926)
Cherry-picked from #56849
Co-authored-by: wudi <[email protected]>
---
.../org/apache/doris/job/common/FailureReason.java | 6 +
.../insert/streaming/StreamingInsertJob.java | 6 +-
.../insert/streaming/StreamingInsertTask.java | 2 +-
.../insert/streaming/StreamingJobProperties.java | 69 +++-
.../trees/plans/commands/AlterJobCommand.java | 40 +-
.../java/org/apache/doris/qe/SessionVariable.java | 57 ++-
.../test_streaming_insert_job_alter.out | 23 ++
.../test_streaming_insert_job_crud.out | 23 ++
.../test_streaming_insert_job_alter.groovy | 155 +++++++
.../test_streaming_insert_job_crud.groovy | 452 +++++++++++++++++++++
10 files changed, 815 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
index 2acfb472a40..c554f750b55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
import java.io.DataOutput;
import java.io.IOException;
@@ -40,6 +41,11 @@ public class FailureReason implements Writable {
public FailureReason(String msg) {
this.msg = msg;
+ if (StringUtils.isNotEmpty(msg) && msg.contains("Insert has filtered
data in strict mode")) {
+ this.code = InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR;
+ } else {
+ this.code = InternalErrorCode.INTERNAL_ERR;
+ }
}
public InternalErrorCode getCode() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 4f2615c4e1e..010b27ff4ed 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -144,10 +144,10 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.offsetProvider =
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
} catch (AnalysisException ae) {
log.warn("parse streaming insert job failed, props: {}",
properties, ae);
- throw new RuntimeException("parse streaming insert job failed, " +
ae.getMessage());
+ throw new RuntimeException(ae.getMessage());
} catch (Exception ex) {
log.warn("init streaming insert job failed, sql: {}",
getExecuteSql(), ex);
- throw new RuntimeException("init streaming insert job failed, " +
ex.getMessage());
+ throw new RuntimeException(ex.getMessage());
}
}
@@ -387,7 +387,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
trow.addToColumnValue(new
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
trow.addToColumnValue(new TCell().setStringVal(getComment()));
- trow.addToColumnValue(new TCell().setStringVal(properties != null
+ trow.addToColumnValue(new TCell().setStringVal(properties != null &&
!properties.isEmpty()
? GsonUtils.GSON.toJson(properties) :
FeConstants.null_string));
if (offsetProvider != null &&
StringUtils.isNotEmpty(offsetProvider.getShowCurrentOffset())) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 433e372abf5..ef127b9ca98 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -159,7 +159,7 @@ public class StreamingInsertTask {
taskCommand.getLabelName(),
errMsg);
if (retry == MAX_RETRY) {
- errMsg = "reached max retry times, failed with" + errMsg;
+ errMsg = "reached max retry times, failed with " + errMsg;
}
} catch (Exception e) {
log.warn("execute insert task error, label is {},offset is
{}", taskCommand.getLabelName(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 35aa601fceb..2bd184c4594 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -21,13 +21,18 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.Util;
import org.apache.doris.job.base.JobProperties;
import org.apache.doris.job.exception.JobException;
-import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.SessionVariable;
import lombok.Data;
+import org.json.simple.JSONObject;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
@Data
public class StreamingJobProperties implements JobProperties {
@@ -35,12 +40,15 @@ public class StreamingJobProperties implements
JobProperties {
public static final String S3_MAX_BATCH_FILES_PROPERTY =
"s3.max_batch_files";
public static final String S3_MAX_BATCH_BYTES_PROPERTY =
"s3.max_batch_bytes";
public static final String SESSION_VAR_PREFIX = "session.";
+ public static final List<String> SUPPORT_STREAM_JOB_PROPS =
+ Arrays.asList(MAX_INTERVAL_SECOND_PROPERTY,
S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY);
public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
public static final long DEFAULT_MAX_S3_BATCH_BYTES = 10 * 1024 * 1024 *
1024L; // 10GB
public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
+
private final Map<String, String> properties;
private long maxIntervalSecond;
private long s3BatchFiles;
@@ -56,6 +64,20 @@ public class StreamingJobProperties implements JobProperties
{
}
public void validate() throws AnalysisException {
+ List<String> invalidProps = new ArrayList<>();
+ for (String key : properties.keySet()) {
+ if (!SUPPORT_STREAM_JOB_PROPS.contains(key) &&
!key.startsWith(SESSION_VAR_PREFIX)) {
+ invalidProps.add(key);
+ }
+ }
+ if (!invalidProps.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Invalid properties: " + invalidProps
+ + ". Supported keys are " +
SUPPORT_STREAM_JOB_PROPS
+ + " or any key starting with '" +
SESSION_VAR_PREFIX + "'."
+ );
+ }
+
this.maxIntervalSecond = Util.getLongPropertyOrDefault(
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND,
(v) -> v >= 1,
@@ -71,9 +93,44 @@ public class StreamingJobProperties implements JobProperties
{
StreamingJobProperties.DEFAULT_MAX_S3_BATCH_BYTES, (v)
-> v >= 100 * 1024 * 1024
&& v <= (long) (1024 * 1024 * 1024) * 10,
StreamingJobProperties.S3_MAX_BATCH_BYTES_PROPERTY + " should
between 100MB and 10GB");
+
+ // validate session variables
+ try {
+ Map<String, String> sessionVarMap = parseSessionVarMap();
+ if (!sessionVarMap.isEmpty()) {
+ SessionVariable sessionVar = new SessionVariable();
+ sessionVar.readFromMap(sessionVarMap);
+ JSONObject sessionVarJson = sessionVar.toJson();
+
+ // check if there are invalid keys
+ Set<String> inputKeys = sessionVarMap.keySet();
+ Set<String> validKeys = sessionVarJson.keySet();
+ Set<String> invalidKeys = new HashSet<>(inputKeys);
+ invalidKeys.removeAll(validKeys);
+ if (!invalidKeys.isEmpty()) {
+ throw new IllegalArgumentException(invalidKeys.toString());
+ }
+ }
+ } catch (Exception e) {
+ throw new AnalysisException("Invalid session variable, " +
e.getMessage());
+ }
}
public SessionVariable getSessionVariable() throws JobException {
+ SessionVariable sessionVariable = new SessionVariable();
+ Map<String, String> sessionVarMap = parseSessionVarMap();
+ if (!sessionVarMap.isEmpty()) {
+ try {
+ sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
+ sessionVariable.readFromMap(sessionVarMap);
+ } catch (Exception e) {
+ throw new JobException("Invalid session variable, " +
e.getMessage());
+ }
+ }
+ return sessionVariable;
+ }
+
+ private Map<String, String> parseSessionVarMap() {
final Map<String, String> sessionVarMap = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(SESSION_VAR_PREFIX)) {
@@ -81,14 +138,6 @@ public class StreamingJobProperties implements
JobProperties {
sessionVarMap.put(subKey, entry.getValue());
}
}
-
- SessionVariable sessionVariable = new SessionVariable();
- try {
- sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
- sessionVariable.readFromJson(GsonUtils.GSON.toJson(sessionVarMap));
- } catch (Exception e) {
- throw new JobException("Invalid session variable, " +
e.getMessage());
- }
- return sessionVariable;
+ return sessionVarMap;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 8d3a4cd6612..5ded155f861 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -25,13 +25,19 @@ import org.apache.doris.job.base.JobExecuteType;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.PlanType;
+import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
+import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -99,12 +105,14 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync, Ne
if (!JobStatus.PAUSED.equals(job.getJobStatus())) {
throw new AnalysisException("Only PAUSED job can be altered");
}
-
if
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
&& job instanceof StreamingInsertJob) {
StreamingInsertJob streamingJob = (StreamingInsertJob) job;
boolean proCheck = checkProperties(streamingJob.getProperties());
boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
+ if (sqlCheck) {
+ checkUnmodifiableProperties(streamingJob.getExecuteSql());
+ }
if (!proCheck && !sqlCheck) {
throw new AnalysisException("No properties or sql changed in
ALTER JOB");
}
@@ -113,6 +121,34 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync, Ne
}
}
+ /**
+ * Check if there are any unmodifiable properties in TVF
+ */
+ private void checkUnmodifiableProperties(String originExecuteSql) {
+ UnboundTVFRelation originTvf = getTvf(originExecuteSql);
+ UnboundTVFRelation inputTvf = getTvf(sql);
+
Preconditions.checkArgument(originTvf.getFunctionName().equalsIgnoreCase(inputTvf.getFunctionName()),
+ "The tvf type %s cannot be modified in ALTER JOB", inputTvf);
+ switch (originTvf.getFunctionName().toLowerCase()) {
+ case "s3":
+
Preconditions.checkArgument(Objects.equals(originTvf.getProperties().getMap().get("uri"),
+ inputTvf.getProperties().getMap().get("uri")),
+ "The uri property cannot be modified in ALTER JOB");
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported tvf type:" +
inputTvf.getFunctionName());
+ }
+ }
+
+ private UnboundTVFRelation getTvf(String sql) {
+ LogicalPlan logicalPlan = new NereidsParser().parseSingle(sql);
+ InsertIntoTableCommand baseCommand = (InsertIntoTableCommand)
logicalPlan;
+ List<UnboundTVFRelation> allTVFRelation =
baseCommand.getAllTVFRelation();
+ Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support
one source in insert streaming job");
+ UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
+ return unboundTVFRelation;
+ }
+
private boolean checkProperties(Map<String, String> originProps) {
if (this.properties == null || this.properties.isEmpty()) {
return false;
@@ -124,7 +160,7 @@ public class AlterJobCommand extends AlterCommand
implements ForwardWithSync, Ne
}
private boolean checkSql(String originSql) {
- if (originSql == null || originSql.isEmpty()) {
+ if (originSql == null || originSql.isEmpty() || sql == null ||
sql.isEmpty()) {
return false;
}
if (!originSql.equals(this.sql)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ab9a31a6e80..80af0119197 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4794,8 +4794,7 @@ public class SessionVariable implements Serializable,
Writable {
field.set(this, root.get(attr.name()));
break;
case "double":
- // root.get(attr.name()) always return Double type, so
need to convert it.
- field.set(this,
Double.valueOf(root.get(attr.name()).toString()));
+ field.set(this, root.get(attr.name()));
break;
case "String":
field.set(this, root.get(attr.name()));
@@ -4810,6 +4809,60 @@ public class SessionVariable implements Serializable,
Writable {
}
}
+ public void readFromMap(Map<String, String> sessionVarMap) throws
IOException {
+ try {
+ for (Field field : SessionVariable.class.getDeclaredFields()) {
+ VarAttr attr = field.getAnnotation(VarAttr.class);
+ if (attr == null) {
+ continue;
+ }
+
+ if (!sessionVarMap.containsKey(attr.name())) {
+ continue;
+ }
+
+ switch (field.getType().getSimpleName()) {
+ case "boolean":
+ String value = sessionVarMap.get(attr.name());
+ if (value.equalsIgnoreCase("ON")
+ || value.equalsIgnoreCase("TRUE")
+ || value.equalsIgnoreCase("1")) {
+ field.setBoolean(this, true);
+ } else if (value.equalsIgnoreCase("OFF")
+ || value.equalsIgnoreCase("FALSE")
+ || value.equalsIgnoreCase("0")) {
+ field.setBoolean(this, false);
+ } else {
+ throw new IllegalAccessException("Variable " +
attr.name()
+ + " can't be set to the value of " +
value);
+ }
+ break;
+ case "int":
+ field.set(this,
Integer.valueOf(sessionVarMap.get(attr.name())));
+ break;
+ case "long":
+ field.set(this,
Long.valueOf(sessionVarMap.get(attr.name())));
+ break;
+ case "float":
+ field.set(this,
Float.valueOf(sessionVarMap.get(attr.name())));
+ break;
+ case "double":
+ field.set(this,
Double.valueOf(sessionVarMap.get(attr.name())));
+ break;
+ case "String":
+ field.set(this, sessionVarMap.get(attr.name()));
+ break;
+ default:
+ // Unsupported type variable.
+ throw new IOException("invalid type: " +
field.getType().getSimpleName());
+ }
+
+ }
+ } catch (Exception ex) {
+ throw new IOException("invalid session variable, " +
ex.getMessage());
+ }
+ }
+
/**
* Get all variables which need to forward along with statement.
**/
diff --git
a/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_alter.out
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_alter.out
new file mode 100644
index 00000000000..3d0b387fece
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_alter.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+1 Emi 25
+2 Ben 35
+3 Oli 28
+4 Ale 60
+5 Ava 17
+6 Wil 69
+7 Sop 32
+8 Jam 64
+9 Emm 37
+10 Lia 64
+11 Ale 34
+12 Isa 43
+13 Ben 56
+14 Sop 12
+15 Chr 33
+16 Emm 23
+17 Mic 11
+18 Oli 38
+19 Dan 19
+20 Ava 28
+
diff --git
a/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_crud.out
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_crud.out
new file mode 100644
index 00000000000..bf3c8f2571c
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_crud.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+1 Emily 25
+2 Benjamin 35
+3 Olivia 28
+4 Alexander 60
+5 Ava 17
+6 William 69
+7 Sophia 32
+8 James 64
+9 Emma 37
+10 Liam 64
+11 Alexander 34
+12 Isabella 43
+13 Benjamin 56
+14 Sophia 12
+15 Christopher 33
+16 Emma 23
+17 Michael 11
+18 Olivia 38
+19 Daniel 19
+20 Ava 28
+
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
new file mode 100644
index 00000000000..097515292c2
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_alter") {
+ def tableName = "test_streaming_insert_job_alter_tbl"
+ def jobName = "test_streaming_insert_job_alter"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` varchar(2) NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // Create a job, let it have data quality errors, modify the schema, and
resume it
+ sql """
+ CREATE JOB ${jobName}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ log.info("check job status pause")
+ def jobSuccendCount = sql """ select status from
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+ // check job status pause
+ jobSuccendCount.size() == 1 && 'PAUSED' ==
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def errorMsg = sql """
+ select ErrorMsg from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert errorMsg.get(0).get(0).contains("Insert has filtered data in strict
mode")
+
+
+ // alter fail job
+ sql """
+ ALTER JOB ${jobName}
+ PROPERTIES(
+ "session.enable_insert_strict" = false,
+ "session.insert_max_filter_ratio" = 1
+ )
+ """
+
+ def alterJobProperties = sql """
+ select status,properties from jobs("type"="insert") where
Name='${jobName}'
+ """
+ assert alterJobProperties.get(0).get(0) == "PAUSED"
+ assert alterJobProperties.get(0).get(1) ==
"{\"session.enable_insert_strict\":\"false\",\"session.insert_max_filter_ratio\":\"1\"}"
+
+ // modify column c2 to varchar(3), Exceeding the limit will be
automatically truncated
+ sql """
+ ALTER TABLE ${tableName} MODIFY COLUMN c2 VARCHAR(3) NULL;
+ """
+
+ def descTbls = sql """Desc ${tableName}"""
+ assert descTbls.size() == 3
+ assert descTbls.get(1).get(0) == "c2"
+ assert descTbls.get(1).get(1) == "varchar(3)"
+
+ //resume job
+ sql """
+ RESUME JOB where jobname = '${jobName}'
+ """
+
+ def resumeJobInfo = sql """
+ select * from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("resume jobInfo: " + resumeJobInfo)
+
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobCountStatus = sql """ select status,
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("check job status running: " + jobCountStatus)
+ // check job status running
+ jobCountStatus.size() == 1 && jobCountStatus.get(0).get(0)
== 'RUNNING' && jobCountStatus.get(0).get(1) >= '1'
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def runningJobInfo = sql """
+ select * from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("running jobInfo: " + runningJobInfo)
+
+ def jobOffset = sql """
+ select currentOffset from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert jobOffset.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+
+ qt_select """ SELECT * FROM ${tableName} order by c1 """
+
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+
+}
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
new file mode 100644
index 00000000000..123c8ab2659
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_crud") {
+ def tableName = "test_streaming_insert_job_crud_tbl"
+ def jobName = "test_streaming_insert_job_crud"
+ def jobNameError = "test_streaming_insert_job_error"
+
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ /************************ create **********************/
+
+ // create Illegal jobname
+ expectExceptionLike({
+ sql """
+ CREATE JOB a+b
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "mismatched input")
+
+ def jobCount = sql """ select count(1) from jobs("type"="insert") where
Name = 'a+b' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ // create Illegal job params key
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ PROPERTIES(
+ "xxx" = "sss"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "Invalid properties: [xxx]")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ // create Illegal job params
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ PROPERTIES(
+ "s3.max_batch_files" = "0"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "s3.max_batch_files should >=1")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ PROPERTIES(
+ "s3.max_batch_bytes" = "102400"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "s3.max_batch_bytes should between 100MB and 10GB")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ PROPERTIES(
+ "max_interval" = "-1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "max_interval should > 1")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ // create Illegal session params
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ PROPERTIES(
+ "session.enable_insert_strict" = "xxx"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "Variable enable_insert_strict can't be set to the value of xxx")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ PROPERTIES(
+ "session.xxx" = "abc"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "Invalid session variable, [xxx]")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ // create error format url
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" = "${s3BucketName}/regression/load/data/example_0.csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "Invalid uri")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ // error sk
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" = "s3://${s3BucketName}/regression/load/data/example_0.csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "xxxx"
+ );
+ """
+ }, "Can not build s3")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ // no exist path
+ expectExceptionLike({
+ sql """
+ CREATE JOB ${jobNameError}
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/NoexistPath/Noexist.csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "insert into cols should be corresponding to the query output")
+
+ jobCount = sql """ select count(1) from jobs("type"="insert") where Name =
'${jobNameError}' and ExecuteType='STREAMING' """
+ assert jobCount.get(0).get(0) == 0
+
+ /************************ select **********************/
+
+ // select no exist job
+ jobCount = sql """ select * from jobs("type"="insert") where Name =
'NoExistJob'"""
+ assert jobCount.size() == 0
+
+
+ /************************ alter **********************/
+
+ // create normal streaming job
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "s3.max_batch_files" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ print("check success task count")
+ def jobSuccendCount = sql """ select SucceedTaskCount from
jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ // check job status and succeed task count larger than 2
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ println("show job: " + showjob)
+ println("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobStatus = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert jobStatus.get(0).get(0) == "RUNNING"
+
+ def jobOffset = sql """
+ select currentOffset, endoffset from jobs("type"="insert") where
Name='${jobName}'
+ """
+ assert jobOffset.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobOffset.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+
+ qt_select """ SELECT * FROM ${tableName} order by c1 """
+
+ // alter running job
+ expectExceptionLike({
+ sql """
+ ALTER JOB ${jobName}
+ PROPERTIES(
+ "session.enable_insert_strict" = false
+ )
+ INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "Only PAUSED job can be altered")
+
+ // pause job
+ sql """
+ PAUSE JOB where jobname = '${jobName}'
+ """
+ def pausedJobStatus = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert pausedJobStatus.get(0).get(0) == "PAUSED"
+
+ // alter no exist job
+ expectExceptionLike({
+ sql """
+ ALTER JOB noExistJob
+ INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "job not exist")
+
+ // alter session var
+ sql """
+ ALTER JOB ${jobName}
+ PROPERTIES(
+ "session.enable_insert_strict" = false,
+ "session.insert_max_filter_ratio" = 0.5,
+ "s3.max_batch_files" = "10"
+ )
+ INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ def alterJobProperties = sql """
+ select properties from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert alterJobProperties.get(0).get(0) ==
"{\"s3.max_batch_files\":\"10\",\"session.enable_insert_strict\":\"false\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+
+ // alter url
+ expectExceptionLike({
+ sql """
+ ALTER JOB ${jobName}
+ INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" = "s3://${s3BucketName}/regression/load/data/xxxx.csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+ }, "The uri property cannot be modified in ALTER JOB")
+
+ /************************ delete **********************/
+ // drop pause job
+ pausedJobStatus = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ assert pausedJobStatus.get(0).get(0) == "PAUSED"
+
+ sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
+
+ def jobCountRsp = sql """select count(1) from jobs("type"="insert") where
Name ='${jobName}'"""
+ assert jobCountRsp.get(0).get(0) == 0
+
+ // drop no exist job
+ expectExceptionLike({
+ sql """ DROP JOB IF EXISTS where jobname = 'NoExistJob' """
+ }, "unregister job error")
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]