This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 9d7092f919e branch-4.0: [Improve](fix) add streamjob case with create 
alter select delete and resume #56849 (#56926)
9d7092f919e is described below

commit 9d7092f919e44d3a0ff34994a1f2204ec7210671
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 14 18:09:51 2025 +0800

    branch-4.0: [Improve](fix) add streamjob case with create alter select 
delete and resume #56849 (#56926)
    
    Cherry-picked from #56849
    
    Co-authored-by: wudi <[email protected]>
---
 .../org/apache/doris/job/common/FailureReason.java |   6 +
 .../insert/streaming/StreamingInsertJob.java       |   6 +-
 .../insert/streaming/StreamingInsertTask.java      |   2 +-
 .../insert/streaming/StreamingJobProperties.java   |  69 +++-
 .../trees/plans/commands/AlterJobCommand.java      |  40 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  57 ++-
 .../test_streaming_insert_job_alter.out            |  23 ++
 .../test_streaming_insert_job_crud.out             |  23 ++
 .../test_streaming_insert_job_alter.groovy         | 155 +++++++
 .../test_streaming_insert_job_crud.groovy          | 452 +++++++++++++++++++++
 10 files changed, 815 insertions(+), 18 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
index 2acfb472a40..c554f750b55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/common/FailureReason.java
@@ -23,6 +23,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -40,6 +41,11 @@ public class FailureReason implements Writable {
 
     public FailureReason(String msg) {
         this.msg = msg;
+        if (StringUtils.isNotEmpty(msg) && msg.contains("Insert has filtered 
data in strict mode")) {
+            this.code = InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR;
+        } else {
+            this.code = InternalErrorCode.INTERNAL_ERR;
+        }
     }
 
     public InternalErrorCode getCode() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 4f2615c4e1e..010b27ff4ed 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -144,10 +144,10 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
             this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
         } catch (AnalysisException ae) {
             log.warn("parse streaming insert job failed, props: {}", 
properties, ae);
-            throw new RuntimeException("parse streaming insert job failed, " + 
 ae.getMessage());
+            throw new RuntimeException(ae.getMessage());
         } catch (Exception ex) {
             log.warn("init streaming insert job failed, sql: {}", 
getExecuteSql(), ex);
-            throw new RuntimeException("init streaming insert job failed, " +  
ex.getMessage());
+            throw new RuntimeException(ex.getMessage());
         }
     }
 
@@ -387,7 +387,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
         trow.addToColumnValue(new TCell().setStringVal(getComment()));
-        trow.addToColumnValue(new TCell().setStringVal(properties != null
+        trow.addToColumnValue(new TCell().setStringVal(properties != null && 
!properties.isEmpty()
                 ? GsonUtils.GSON.toJson(properties) : 
FeConstants.null_string));
 
         if (offsetProvider != null && 
StringUtils.isNotEmpty(offsetProvider.getShowCurrentOffset())) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 433e372abf5..ef127b9ca98 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -159,7 +159,7 @@ public class StreamingInsertTask {
                         taskCommand.getLabelName(),
                         errMsg);
                 if (retry == MAX_RETRY) {
-                    errMsg = "reached max retry times, failed with" + errMsg;
+                    errMsg = "reached max retry times, failed with " + errMsg;
                 }
             } catch (Exception e) {
                 log.warn("execute insert task error, label is {},offset is 
{}", taskCommand.getLabelName(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
index 35aa601fceb..2bd184c4594 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobProperties.java
@@ -21,13 +21,18 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.job.base.JobProperties;
 import org.apache.doris.job.exception.JobException;
-import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.SessionVariable;
 
 import lombok.Data;
+import org.json.simple.JSONObject;
 
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 @Data
 public class StreamingJobProperties implements JobProperties {
@@ -35,12 +40,15 @@ public class StreamingJobProperties implements 
JobProperties {
     public static final String S3_MAX_BATCH_FILES_PROPERTY = 
"s3.max_batch_files";
     public static final String S3_MAX_BATCH_BYTES_PROPERTY = 
"s3.max_batch_bytes";
     public static final String SESSION_VAR_PREFIX = "session.";
+    public static final List<String> SUPPORT_STREAM_JOB_PROPS =
+            Arrays.asList(MAX_INTERVAL_SECOND_PROPERTY, 
S3_MAX_BATCH_FILES_PROPERTY, S3_MAX_BATCH_BYTES_PROPERTY);
 
     public static final long DEFAULT_MAX_INTERVAL_SECOND = 10;
     public static final long DEFAULT_MAX_S3_BATCH_FILES = 256;
     public static final long DEFAULT_MAX_S3_BATCH_BYTES = 10 * 1024 * 1024 * 
1024L; // 10GB
     public static final int DEFAULT_INSERT_TIMEOUT = 30 * 60; // 30min
 
+
     private final Map<String, String> properties;
     private long maxIntervalSecond;
     private long s3BatchFiles;
@@ -56,6 +64,20 @@ public class StreamingJobProperties implements JobProperties 
{
     }
 
     public void validate() throws AnalysisException {
+        List<String> invalidProps = new ArrayList<>();
+        for (String key : properties.keySet()) {
+            if (!SUPPORT_STREAM_JOB_PROPS.contains(key) && 
!key.startsWith(SESSION_VAR_PREFIX)) {
+                invalidProps.add(key);
+            }
+        }
+        if (!invalidProps.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Invalid properties: " + invalidProps
+                            + ". Supported keys are " + 
SUPPORT_STREAM_JOB_PROPS
+                            + " or any key starting with '" + 
SESSION_VAR_PREFIX + "'."
+            );
+        }
+
         this.maxIntervalSecond = Util.getLongPropertyOrDefault(
                         
properties.get(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY),
                         StreamingJobProperties.DEFAULT_MAX_INTERVAL_SECOND, 
(v) -> v >= 1,
@@ -71,9 +93,44 @@ public class StreamingJobProperties implements JobProperties 
{
                         StreamingJobProperties.DEFAULT_MAX_S3_BATCH_BYTES, (v) 
-> v >= 100 * 1024 * 1024
                         && v <= (long) (1024 * 1024 * 1024) * 10,
                 StreamingJobProperties.S3_MAX_BATCH_BYTES_PROPERTY + " should 
between 100MB and 10GB");
+
+        // validate session variables
+        try {
+            Map<String, String> sessionVarMap = parseSessionVarMap();
+            if (!sessionVarMap.isEmpty()) {
+                SessionVariable sessionVar = new SessionVariable();
+                sessionVar.readFromMap(sessionVarMap);
+                JSONObject sessionVarJson = sessionVar.toJson();
+
+                // check if there are invalid keys
+                Set<String> inputKeys = sessionVarMap.keySet();
+                Set<String> validKeys = sessionVarJson.keySet();
+                Set<String> invalidKeys = new HashSet<>(inputKeys);
+                invalidKeys.removeAll(validKeys);
+                if (!invalidKeys.isEmpty()) {
+                    throw new IllegalArgumentException(invalidKeys.toString());
+                }
+            }
+        } catch (Exception e) {
+            throw new AnalysisException("Invalid session variable, " + 
e.getMessage());
+        }
     }
 
     public SessionVariable getSessionVariable() throws JobException {
+        SessionVariable sessionVariable = new SessionVariable();
+        Map<String, String> sessionVarMap = parseSessionVarMap();
+        if (!sessionVarMap.isEmpty()) {
+            try {
+                sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
+                sessionVariable.readFromMap(sessionVarMap);
+            } catch (Exception e) {
+                throw new JobException("Invalid session variable, " + 
e.getMessage());
+            }
+        }
+        return sessionVariable;
+    }
+
+    private Map<String, String> parseSessionVarMap() {
         final Map<String, String> sessionVarMap = new HashMap<>();
         for (Map.Entry<String, String> entry : properties.entrySet()) {
             if (entry.getKey().startsWith(SESSION_VAR_PREFIX)) {
@@ -81,14 +138,6 @@ public class StreamingJobProperties implements 
JobProperties {
                 sessionVarMap.put(subKey, entry.getValue());
             }
         }
-
-        SessionVariable sessionVariable = new SessionVariable();
-        try {
-            sessionVariable.setInsertTimeoutS(DEFAULT_INSERT_TIMEOUT);
-            sessionVariable.readFromJson(GsonUtils.GSON.toJson(sessionVarMap));
-        } catch (Exception e) {
-            throw new JobException("Invalid session variable, " + 
e.getMessage());
-        }
-        return sessionVariable;
+        return sessionVarMap;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
index 8d3a4cd6612..5ded155f861 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java
@@ -25,13 +25,19 @@ import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.exception.JobException;
 import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
+import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
+import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.trees.plans.PlanType;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.StmtExecutor;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -99,12 +105,14 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync, Ne
         if (!JobStatus.PAUSED.equals(job.getJobStatus())) {
             throw new AnalysisException("Only PAUSED job can be altered");
         }
-
         if 
(job.getJobConfig().getExecuteType().equals(JobExecuteType.STREAMING)
                 && job instanceof StreamingInsertJob) {
             StreamingInsertJob streamingJob = (StreamingInsertJob) job;
             boolean proCheck = checkProperties(streamingJob.getProperties());
             boolean sqlCheck = checkSql(streamingJob.getExecuteSql());
+            if (sqlCheck) {
+                checkUnmodifiableProperties(streamingJob.getExecuteSql());
+            }
             if (!proCheck && !sqlCheck) {
                 throw new AnalysisException("No properties or sql changed in 
ALTER JOB");
             }
@@ -113,6 +121,34 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync, Ne
         }
     }
 
+    /**
+     * Check if there are any unmodifiable properties in TVF
+     */
+    private void checkUnmodifiableProperties(String originExecuteSql) {
+        UnboundTVFRelation originTvf = getTvf(originExecuteSql);
+        UnboundTVFRelation inputTvf = getTvf(sql);
+        
Preconditions.checkArgument(originTvf.getFunctionName().equalsIgnoreCase(inputTvf.getFunctionName()),
+                "The tvf type %s cannot be modified in ALTER JOB", inputTvf);
+        switch (originTvf.getFunctionName().toLowerCase()) {
+            case "s3":
+                
Preconditions.checkArgument(Objects.equals(originTvf.getProperties().getMap().get("uri"),
+                        inputTvf.getProperties().getMap().get("uri")),
+                        "The uri property cannot be modified in ALTER JOB");
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported tvf type:" + 
inputTvf.getFunctionName());
+        }
+    }
+
+    private UnboundTVFRelation getTvf(String sql) {
+        LogicalPlan logicalPlan = new NereidsParser().parseSingle(sql);
+        InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) 
logicalPlan;
+        List<UnboundTVFRelation> allTVFRelation = 
baseCommand.getAllTVFRelation();
+        Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support 
one source in insert streaming job");
+        UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
+        return unboundTVFRelation;
+    }
+
     private boolean checkProperties(Map<String, String> originProps) {
         if (this.properties == null || this.properties.isEmpty()) {
             return false;
@@ -124,7 +160,7 @@ public class AlterJobCommand extends AlterCommand 
implements ForwardWithSync, Ne
     }
 
     private boolean checkSql(String originSql) {
-        if (originSql == null || originSql.isEmpty()) {
+        if (originSql == null || originSql.isEmpty() || sql == null || 
sql.isEmpty()) {
             return false;
         }
         if (!originSql.equals(this.sql)) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ab9a31a6e80..80af0119197 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4794,8 +4794,7 @@ public class SessionVariable implements Serializable, 
Writable {
                         field.set(this, root.get(attr.name()));
                         break;
                     case "double":
-                        // root.get(attr.name()) always return Double type, so 
need to convert it.
-                        field.set(this, 
Double.valueOf(root.get(attr.name()).toString()));
+                        field.set(this, root.get(attr.name()));
                         break;
                     case "String":
                         field.set(this, root.get(attr.name()));
@@ -4810,6 +4809,60 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
+    public void readFromMap(Map<String, String> sessionVarMap)  throws 
IOException {
+        try {
+            for (Field field : SessionVariable.class.getDeclaredFields()) {
+                VarAttr attr = field.getAnnotation(VarAttr.class);
+                if (attr == null) {
+                    continue;
+                }
+
+                if (!sessionVarMap.containsKey(attr.name())) {
+                    continue;
+                }
+
+                switch (field.getType().getSimpleName()) {
+                    case "boolean":
+                        String value = sessionVarMap.get(attr.name());
+                        if (value.equalsIgnoreCase("ON")
+                                || value.equalsIgnoreCase("TRUE")
+                                || value.equalsIgnoreCase("1")) {
+                            field.setBoolean(this, true);
+                        } else if (value.equalsIgnoreCase("OFF")
+                                || value.equalsIgnoreCase("FALSE")
+                                || value.equalsIgnoreCase("0")) {
+                            field.setBoolean(this, false);
+                        } else {
+                            throw new IllegalAccessException("Variable " + 
attr.name()
+                                    + " can't be set to the value of " + 
value);
+                        }
+                        break;
+                    case "int":
+                        field.set(this, 
Integer.valueOf(sessionVarMap.get(attr.name())));
+                        break;
+                    case "long":
+                        field.set(this, 
Long.valueOf(sessionVarMap.get(attr.name())));
+                        break;
+                    case "float":
+                        field.set(this, 
Float.valueOf(sessionVarMap.get(attr.name())));
+                        break;
+                    case "double":
+                        field.set(this, 
Double.valueOf(sessionVarMap.get(attr.name())));
+                        break;
+                    case "String":
+                        field.set(this, sessionVarMap.get(attr.name()));
+                        break;
+                    default:
+                        // Unsupported type variable.
+                        throw new IOException("invalid type: " + 
field.getType().getSimpleName());
+                }
+
+            }
+        } catch (Exception ex) {
+            throw new IOException("invalid session variable, " + 
ex.getMessage());
+        }
+    }
+
     /**
      * Get all variables which need to forward along with statement.
      **/
diff --git 
a/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_alter.out 
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_alter.out
new file mode 100644
index 00000000000..3d0b387fece
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_alter.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+1      Emi     25
+2      Ben     35
+3      Oli     28
+4      Ale     60
+5      Ava     17
+6      Wil     69
+7      Sop     32
+8      Jam     64
+9      Emm     37
+10     Lia     64
+11     Ale     34
+12     Isa     43
+13     Ben     56
+14     Sop     12
+15     Chr     33
+16     Emm     23
+17     Mic     11
+18     Oli     38
+19     Dan     19
+20     Ava     28
+
diff --git 
a/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_crud.out 
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_crud.out
new file mode 100644
index 00000000000..bf3c8f2571c
--- /dev/null
+++ 
b/regression-test/data/job_p0/streaming_job/test_streaming_insert_job_crud.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !select --
+1      Emily   25
+2      Benjamin        35
+3      Olivia  28
+4      Alexander       60
+5      Ava     17
+6      William 69
+7      Sophia  32
+8      James   64
+9      Emma    37
+10     Liam    64
+11     Alexander       34
+12     Isabella        43
+13     Benjamin        56
+14     Sophia  12
+15     Christopher     33
+16     Emma    23
+17     Michael 11
+18     Olivia  38
+19     Daniel  19
+20     Ava     28
+
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
new file mode 100644
index 00000000000..097515292c2
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter.groovy
@@ -0,0 +1,155 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_alter") {
+    def tableName = "test_streaming_insert_job_alter_tbl"
+    def jobName = "test_streaming_insert_job_alter"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` varchar(2) NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    // Create a job, let it have data quality errors, modify the schema, and 
resume it
+    sql """
+       CREATE JOB ${jobName}  
+       ON STREAMING DO INSERT INTO ${tableName} 
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+
+    try {
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    log.info("check job status pause")
+                    def jobSuccendCount = sql """ select status from 
jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """
+                    // check job status pause
+                    jobSuccendCount.size() == 1 && 'PAUSED' == 
jobSuccendCount.get(0).get(0)
+                }
+        )
+    } catch (Exception ex){
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        log.info("show job: " + showjob)
+        log.info("show task: " + showtask)
+        throw ex;
+    }
+
+    def errorMsg = sql """
+        select ErrorMsg from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert errorMsg.get(0).get(0).contains("Insert has filtered data in strict 
mode")
+
+
+    // alter fail job
+    sql """
+       ALTER JOB ${jobName}
+       PROPERTIES(
+        "session.enable_insert_strict" = false,
+        "session.insert_max_filter_ratio" = 1
+       )
+        """
+
+    def alterJobProperties = sql """
+        select status,properties from jobs("type"="insert") where 
Name='${jobName}'
+    """
+    assert alterJobProperties.get(0).get(0) == "PAUSED"
+    assert alterJobProperties.get(0).get(1) == 
"{\"session.enable_insert_strict\":\"false\",\"session.insert_max_filter_ratio\":\"1\"}"
+
+    // modify column c2 to varchar(3), Exceeding the limit will be 
automatically truncated
+    sql """
+        ALTER TABLE ${tableName} MODIFY COLUMN c2 VARCHAR(3) NULL;
+    """
+
+    def descTbls = sql """Desc ${tableName}"""
+    assert descTbls.size() == 3
+    assert descTbls.get(1).get(0)  == "c2"
+    assert descTbls.get(1).get(1)  == "varchar(3)"
+
+    //resume job
+    sql """
+        RESUME JOB where jobname =  '${jobName}'
+    """
+
+    def resumeJobInfo = sql """
+        select * from jobs("type"="insert") where Name='${jobName}'
+    """
+    log.info("resume jobInfo: " + resumeJobInfo)
+
+    try {
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    def jobCountStatus = sql """ select status, 
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and 
ExecuteType='STREAMING' """
+                    log.info("check job status running: " + jobCountStatus)
+                    // check job status running
+                    jobCountStatus.size() == 1 && jobCountStatus.get(0).get(0) 
== 'RUNNING' && jobCountStatus.get(0).get(1) >= '1'
+                }
+        )
+    } catch (Exception ex){
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        log.info("show job: " + showjob)
+        log.info("show task: " + showtask)
+        throw ex;
+    }
+
+    def runningJobInfo = sql """
+        select * from jobs("type"="insert") where Name='${jobName}'
+    """
+    log.info("running jobInfo: " + runningJobInfo)
+
+    def jobOffset = sql """
+        select currentOffset from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert jobOffset.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+
+    qt_select """ SELECT * FROM ${tableName} order by c1 """
+
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+
+    def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
+    assert jobCountRsp.get(0).get(0) == 0
+
+}
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
new file mode 100644
index 00000000000..123c8ab2659
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_crud.groovy
@@ -0,0 +1,452 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_insert_job_crud") {
+    def tableName = "test_streaming_insert_job_crud_tbl"
+    def jobName = "test_streaming_insert_job_crud"
+    def jobNameError = "test_streaming_insert_job_error"
+
+    sql """drop table if exists `${tableName}` force"""
+    sql """
+        DROP JOB IF EXISTS where jobname =  '${jobName}'
+    """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `c1` int NULL,
+            `c2` string NULL,
+            `c3` int  NULL,
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`c1`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+    """
+
+    /************************ create **********************/
+
+    // create Illegal jobname
+    expectExceptionLike({
+        sql """
+           CREATE JOB a+b
+           ON STREAMING DO INSERT INTO ${tableName} 
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+    }, "mismatched input")
+
+    def jobCount = sql """ select count(1) from jobs("type"="insert") where 
Name = 'a+b' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    // create Illegal job params key
+    expectExceptionLike({
+        sql """
+           CREATE JOB ${jobNameError}
+           PROPERTIES(
+            "xxx" = "sss"
+           )
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+    }, "Invalid properties: [xxx]")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    // create Illegal job params
+    expectExceptionLike({
+        sql """
+           CREATE JOB ${jobNameError}
+           PROPERTIES(
+            "s3.max_batch_files" = "0"
+           )
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+    }, "s3.max_batch_files should >=1")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    expectExceptionLike({
+        sql """
+           CREATE JOB ${jobNameError}
+           PROPERTIES(
+            "s3.max_batch_bytes" = "102400"
+           )
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+    }, "s3.max_batch_bytes should between 100MB and 10GB")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    expectExceptionLike({
+        sql """
+           CREATE JOB ${jobNameError}
+           PROPERTIES(
+            "max_interval" = "-1"
+           )
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+    }, "max_interval should > 1")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    // create Illegal session params
+    expectExceptionLike({
+        sql """
+           CREATE JOB ${jobNameError}
+           PROPERTIES(
+            "session.enable_insert_strict" = "xxx"
+           )
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+    }, "Variable enable_insert_strict can't be set to the value of xxx")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    expectExceptionLike({
+        sql """
+           CREATE JOB ${jobNameError}
+           PROPERTIES(
+            "session.xxx" = "abc"
+           )
+           ON STREAMING DO INSERT INTO ${tableName}
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+    }, "Invalid session variable, [xxx]")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    // create error format url
+    expectExceptionLike({
+        sql """
+       CREATE JOB ${jobNameError}
+       ON STREAMING DO INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = "${s3BucketName}/regression/load/data/example_0.csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+        """
+    }, "Invalid uri")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    // error sk
+    expectExceptionLike({
+        sql """
+       CREATE JOB ${jobNameError}
+       ON STREAMING DO INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = "s3://${s3BucketName}/regression/load/data/example_0.csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "xxxx"
+        );
+        """
+    }, "Can not build s3")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    // no exist path
+    expectExceptionLike({
+        sql """
+       CREATE JOB ${jobNameError}
+       ON STREAMING DO INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/NoexistPath/Noexist.csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+        """
+    }, "insert into cols should be corresponding to the query output")
+
+    jobCount = sql """ select count(1) from jobs("type"="insert") where Name = 
'${jobNameError}' and ExecuteType='STREAMING' """
+    assert jobCount.get(0).get(0) == 0
+
+    /************************ select **********************/
+
+    // select no exist job
+    jobCount = sql """ select * from jobs("type"="insert") where Name = 
'NoExistJob'"""
+    assert jobCount.size() == 0
+
+
+    /************************ alter **********************/
+
+    // create normal streaming job
+    sql """
+       CREATE JOB ${jobName}  
+       PROPERTIES(
+        "s3.max_batch_files" = "1"
+       )
+       ON STREAMING DO INSERT INTO ${tableName} 
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+    try {
+        Awaitility.await().atMost(300, SECONDS)
+                .pollInterval(1, SECONDS).until(
+                {
+                    print("check success task count")
+                    def jobSuccendCount = sql """ select SucceedTaskCount from 
jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
+                    // check job status and succeed task count larger than 2
+                    jobSuccendCount.size() == 1 && '2' <= 
jobSuccendCount.get(0).get(0)
+                }
+        )
+    } catch (Exception ex){
+        def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+        def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+        println("show job: " + showjob)
+        println("show task: " + showtask)
+        throw ex;
+    }
+
+    def jobStatus = sql """
+        select status from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert jobStatus.get(0).get(0) == "RUNNING"
+
+    def jobOffset = sql """
+        select currentOffset, endoffset from jobs("type"="insert") where 
Name='${jobName}'
+    """
+    assert jobOffset.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+    assert jobOffset.get(0).get(1) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+
+    qt_select """ SELECT * FROM ${tableName} order by c1 """
+
+    // alter running job
+    expectExceptionLike({
+        sql """
+       ALTER JOB ${jobName}
+       PROPERTIES(
+        "session.enable_insert_strict" = false
+       )
+       INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+    }, "Only PAUSED job can be altered")
+
+    // pause job
+    sql """
+        PAUSE JOB where jobname =  '${jobName}'
+    """
+    def pausedJobStatus = sql """
+        select status from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert pausedJobStatus.get(0).get(0) == "PAUSED"
+
+    // alter no exist job
+    expectExceptionLike({
+        sql """
+       ALTER JOB noExistJob
+       INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+    }, "job not exist")
+
+    // alter session var
+    sql """
+       ALTER JOB ${jobName}
+       PROPERTIES(
+        "session.enable_insert_strict" = false,
+        "session.insert_max_filter_ratio" = 0.5,
+        "s3.max_batch_files" = "10"
+       )
+       INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+    """
+
+    def alterJobProperties = sql """
+        select properties from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert alterJobProperties.get(0).get(0) == 
"{\"s3.max_batch_files\":\"10\",\"session.enable_insert_strict\":\"false\",\"session.insert_max_filter_ratio\":\"0.5\"}"
+
+    // alter url
+    expectExceptionLike({
+        sql """
+       ALTER JOB ${jobName}
+       INSERT INTO ${tableName}
+       SELECT * FROM S3
+        (
+            "uri" = "s3://${s3BucketName}/regression/load/data/xxxx.csv",
+            "format" = "csv",
+            "provider" = "${getS3Provider()}",
+            "column_separator" = ",",
+            "s3.endpoint" = "${getS3Endpoint()}",
+            "s3.region" = "${getS3Region()}",
+            "s3.access_key" = "${getS3AK()}",
+            "s3.secret_key" = "${getS3SK()}"
+        );
+        """
+    }, "The uri property cannot be modified in ALTER JOB")
+
+    /************************ delete **********************/
+    // drop pause job
+    pausedJobStatus = sql """
+        select status from jobs("type"="insert") where Name='${jobName}'
+    """
+    assert pausedJobStatus.get(0).get(0) == "PAUSED"
+
+    sql """ DROP JOB IF EXISTS where jobname =  '${jobName}' """
+
+    def jobCountRsp = sql """select count(1) from jobs("type"="insert")  where 
Name ='${jobName}'"""
+    assert jobCountRsp.get(0).get(0) == 0
+
+    // drop no exist job
+    expectExceptionLike({
+        sql """ DROP JOB IF EXISTS where jobname =  'NoExistJob' """
+    }, "unregister job error")
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to