This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f0f12729363 [test](streaming job) add streaming job restart fe test 
(#56808)
f0f12729363 is described below

commit f0f12729363c146b7d26b7cbb500efa86539314d
Author: hui lai <[email protected]>
AuthorDate: Mon Oct 13 09:51:26 2025 +0800

    [test](streaming job) add streaming job restart fe test (#56808)
    
    Add streaming job restart fe test
---
 .../test_streaming_job_restart_fe.groovy           | 123 +++++++++++++++++++++
 1 file changed, 123 insertions(+)

diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
new file mode 100644
index 00000000000..34941325ca7
--- /dev/null
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_job_restart_fe", "docker") {
+    def tableName = "test_streaming_job_restart_fe_tbl"
+    def jobName = "test_streaming_job_restart_fe_name"
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    
+    docker(options) {
+        sql """drop table if exists `${tableName}` force"""
+        sql """
+            DROP JOB IF EXISTS where jobname =  '${jobName}'
+        """
+
+        sql """
+            CREATE TABLE IF NOT EXISTS ${tableName} (
+                `c1` int NULL,
+                `c2` string NULL,
+                `c3` int  NULL,
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`c1`)
+            COMMENT 'OLAP'
+            DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+            PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+        """
+
+        // create streaming job
+        sql """
+           CREATE JOB ${jobName}  
+           PROPERTIES(
+            "s3.max_batch_files" = "1"
+           )
+           ON STREAMING DO INSERT INTO ${tableName} 
+           SELECT * FROM S3
+            (
+                "uri" = 
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+                "format" = "csv",
+                "provider" = "${getS3Provider()}",
+                "column_separator" = ",",
+                "s3.endpoint" = "${getS3Endpoint()}",
+                "s3.region" = "${getS3Region()}",
+                "s3.access_key" = "${getS3AK()}",
+                "s3.secret_key" = "${getS3SK()}"
+            );
+        """
+
+        try {
+            // Wait for streaming job to process some data
+            Awaitility.await().atMost(300, SECONDS)
+                    .pollInterval(1, SECONDS).until(
+                    {
+                        def jobSuccendCount = sql """ select SucceedTaskCount 
from jobs("type"="insert") where Name like '%${jobName}%' and 
ExecuteType='STREAMING' """
+                        log.info("jobSuccendCount: " + jobSuccendCount)
+                        // check job status and succeed task count larger than 
1
+                        jobSuccendCount.size() == 1 && '1' <= 
jobSuccendCount.get(0).get(0)
+                    }
+            )
+        } catch (Exception ex){
+            def showjob = sql """select * from jobs("type"="insert") where 
Name='${jobName}'"""
+            def showtask = sql """select * from tasks("type"="insert") where 
JobName='${jobName}'"""
+            log.info("show job: " + showjob)
+            log.info("show task: " + showtask)
+            throw ex;
+        }
+
+        sql """
+            PAUSE JOB where jobname =  '${jobName}'
+        """
+        sql """
+            RESUME JOB where jobname =  '${jobName}'
+        """
+        def jobInfo = sql """
+            select currentOffset, endoffset, loadStatistic from 
jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        assert jobInfo.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+        assert jobInfo.get(0).get(1) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+        assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+
+        // Restart FE
+        cluster.restartFrontends()
+        sleep(30000)
+        context.reconnectFe()
+
+        def jobStatus = sql """
+            select status from jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobstatus: " + jobStatus)
+        assert jobStatus.get(0).get(0) == "RUNNING" || 
resumeJobStatus.get(0).get(0) == "PENDING"
+        jobInfo = sql """
+            select currentOffset, endoffset, loadStatistic from 
jobs("type"="insert") where Name='${jobName}'
+        """
+        log.info("jobInfo: " + jobInfo)
+        assert jobInfo.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+        assert jobInfo.get(0).get(1) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+        assert jobInfo.get(0).get(2) == 
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+
+        sql """ DROP JOB IF EXISTS where jobname =  '${jobName}' """
+        sql """drop table if exists `${tableName}` force"""
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to