This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f0f12729363 [test](streaming job) add streaming job restart fe test
(#56808)
f0f12729363 is described below
commit f0f12729363c146b7d26b7cbb500efa86539314d
Author: hui lai <[email protected]>
AuthorDate: Mon Oct 13 09:51:26 2025 +0800
[test](streaming job) add streaming job restart fe test (#56808)
Add streaming job restart fe test
---
.../test_streaming_job_restart_fe.groovy | 123 +++++++++++++++++++++
1 file changed, 123 insertions(+)
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
new file mode 100644
index 00000000000..34941325ca7
--- /dev/null
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+suite("test_streaming_job_restart_fe", "docker") {
+ def tableName = "test_streaming_job_restart_fe_tbl"
+ def jobName = "test_streaming_job_restart_fe_name"
+
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+
+ docker(options) {
+ sql """drop table if exists `${tableName}` force"""
+ sql """
+ DROP JOB IF EXISTS where jobname = '${jobName}'
+ """
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `c1` int NULL,
+ `c2` string NULL,
+ `c3` int NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`c1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`c1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ // create streaming job
+ sql """
+ CREATE JOB ${jobName}
+ PROPERTIES(
+ "s3.max_batch_files" = "1"
+ )
+ ON STREAMING DO INSERT INTO ${tableName}
+ SELECT * FROM S3
+ (
+ "uri" =
"s3://${s3BucketName}/regression/load/data/example_[0-1].csv",
+ "format" = "csv",
+ "provider" = "${getS3Provider()}",
+ "column_separator" = ",",
+ "s3.endpoint" = "${getS3Endpoint()}",
+ "s3.region" = "${getS3Region()}",
+ "s3.access_key" = "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}"
+ );
+ """
+
+ try {
+ // Wait for streaming job to process some data
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name like '%${jobName}%' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ // check job status and succeed task count larger than
1
+ jobSuccendCount.size() == 1 && '1' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ sql """
+ PAUSE JOB where jobname = '${jobName}'
+ """
+ sql """
+ RESUME JOB where jobname = '${jobName}'
+ """
+ def jobInfo = sql """
+ select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+
+ // Restart FE
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+
+ def jobStatus = sql """
+ select status from jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobstatus: " + jobStatus)
+ assert jobStatus.get(0).get(0) == "RUNNING" ||
resumeJobStatus.get(0).get(0) == "PENDING"
+ jobInfo = sql """
+ select currentOffset, endoffset, loadStatistic from
jobs("type"="insert") where Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ assert jobInfo.get(0).get(0) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(1) ==
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+ assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":0,\"fileSize\":0}"
+
+ sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
+ sql """drop table if exists `${tableName}` force"""
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]