This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new d471124ebd7 [Fix](job) improve job api (#56352)
d471124ebd7 is described below

commit d471124ebd71ee72c911d7c6a36628d88fb3a934
Author: wudi <[email protected]>
AuthorDate: Tue Sep 23 16:44:41 2025 +0800

    [Fix](job) improve job api (#56352)
    
    ### What problem does this PR solve?
    
    improve job api
---
 .../org/apache/doris/job/base/AbstractJob.java     |  9 ++++---
 .../main/java/org/apache/doris/job/base/Job.java   |  7 +++++
 .../java/org/apache/doris/job/common/JobUtils.java | 30 ----------------------
 .../doris/job/executor/DispatchTaskHandler.java    |  3 +--
 .../doris/job/executor/TimerJobSchedulerTask.java  |  3 +--
 .../doris/job/extensions/insert/InsertJob.java     |  7 ++---
 .../doris/job/extensions/insert/InsertTask.java    |  5 +---
 .../insert/streaming/StreamingInsertJob.java       | 20 +++++++++++----
 .../streaming/StreamingJobSchedulerTask.java       |  2 --
 .../doris/job/offset/SourceOffsetProvider.java     |  8 +++---
 .../org/apache/doris/job/offset/s3/S3Offset.java   |  4 +--
 .../job/offset/s3/S3SourceOffsetProvider.java      | 12 +++++----
 .../apache/doris/job/scheduler/JobScheduler.java   |  7 +++--
 .../streaming_job/test_streaming_insert_job.groovy | 14 +++++-----
 14 files changed, 54 insertions(+), 77 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 5b03ae6d18b..6a113d1a042 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -178,6 +178,11 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         return System.nanoTime() + RandomUtils.nextInt();
     }
 
+    @Override
+    public boolean isJobRunning() {
+        return JobStatus.RUNNING.equals(getJobStatus());
+    }
+
     @Override
     public void cancelTaskById(long taskId) throws JobException {
         if (CollectionUtils.isEmpty(runningTasks)) {
@@ -484,8 +489,4 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     public boolean needPersist() {
         return true;
     }
-
-    public boolean isFinalStatus() {
-        return jobStatus.equals(JobStatus.STOPPED) || 
jobStatus.equals(JobStatus.FINISHED);
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
index 69d1e5e55fb..3afcd5eef9b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/Job.java
@@ -68,6 +68,13 @@ public interface Job<T extends AbstractTask, C> {
      */
     boolean isReadyForScheduling(C taskContext);
 
+    /**
+     * Checks if the job is running.
+     *
+     * @return True if the job is runnning.
+     */
+    boolean isJobRunning();
+
     /**
      * Retrieves the metadata for the job, which is used to display job 
information.
      *
diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java
deleted file mode 100644
index 93ff5f41480..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/job/common/JobUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.job.common;
-
-import org.apache.doris.job.base.AbstractJob;
-import org.apache.doris.job.base.JobExecuteType;
-
-public class JobUtils {
-    public static boolean checkNeedSchedule(AbstractJob job) {
-        if (job.getJobConfig().getExecuteType() == JobExecuteType.STREAMING) {
-            return !job.isFinalStatus();
-        }
-        return job.getJobStatus() == JobStatus.RUNNING;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
index 35b1f351f72..228b9724863 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/DispatchTaskHandler.java
@@ -19,7 +19,6 @@ package org.apache.doris.job.executor;
 
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.common.JobType;
-import org.apache.doris.job.common.JobUtils;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.disruptor.TimerJobEvent;
 import org.apache.doris.job.task.AbstractTask;
@@ -55,7 +54,7 @@ public class DispatchTaskHandler<T extends AbstractJob> 
implements WorkHandler<T
                 log.info("job is null,may be job is deleted, ignore");
                 return;
             }
-            if (event.getJob().isReadyForScheduling(null) && 
JobUtils.checkNeedSchedule(event.getJob())) {
+            if (event.getJob().isReadyForScheduling(null) && 
event.getJob().isJobRunning()) {
                 List<? extends AbstractTask> tasks = 
event.getJob().commonCreateTasks(TaskType.SCHEDULED, null);
                 if (CollectionUtils.isEmpty(tasks)) {
                     log.warn("job is ready for scheduling, but create task is 
empty, skip scheduler,"
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
index 0f58452ff7b..4abf72023a4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/executor/TimerJobSchedulerTask.java
@@ -18,7 +18,6 @@
 package org.apache.doris.job.executor;
 
 import org.apache.doris.job.base.AbstractJob;
-import org.apache.doris.job.common.JobUtils;
 import org.apache.doris.job.disruptor.TaskDisruptor;
 
 import io.netty.util.Timeout;
@@ -40,7 +39,7 @@ public class TimerJobSchedulerTask<T extends AbstractJob> 
implements TimerTask {
     @Override
     public void run(Timeout timeout) {
         try {
-            if (!JobUtils.checkNeedSchedule(job)) {
+            if (!job.isJobRunning()) {
                 log.info("job status is not running, job id is {}, skip 
dispatch", this.job.getJobId());
                 return;
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
index cc16cbb603d..305089378a0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertJob.java
@@ -100,8 +100,8 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
             .add(new Column("Comment", ScalarType.createStringType()))
             // only execute type = streaming need record
             .add(new Column("Properties", ScalarType.createStringType()))
-            .add(new Column("ConsumedOffset", ScalarType.createStringType()))
-            .add(new Column("MaxOffset", ScalarType.createStringType()))
+            .add(new Column("CurrentOffset", ScalarType.createStringType()))
+            .add(new Column("EndOffset", ScalarType.createStringType()))
             .add(new Column("LoadStatistic", ScalarType.createStringType()))
             .add(new Column("ErrorMsg", ScalarType.createStringType()))
             .build();
@@ -114,15 +114,12 @@ public class InsertJob extends AbstractJob<InsertTask, 
Map<Object, Object>> impl
                     .addColumn(new Column("EtlInfo", 
ScalarType.createVarchar(100)))
                     .addColumn(new Column("TaskInfo", 
ScalarType.createVarchar(100)))
                     .addColumn(new Column("ErrorMsg", 
ScalarType.createVarchar(100)))
-
                     .addColumn(new Column("CreateTimeMs", 
ScalarType.createVarchar(20)))
                     .addColumn(new Column("FinishTimeMs", 
ScalarType.createVarchar(20)))
                     .addColumn(new Column("TrackingUrl", 
ScalarType.createVarchar(200)))
                     .addColumn(new Column("LoadStatistic", 
ScalarType.createVarchar(200)))
                     .addColumn(new Column("User", 
ScalarType.createVarchar(50)))
                     .addColumn(new Column("FirstErrorMsg", 
ScalarType.createVarchar(200)))
-                    // only execute type = streaming need record
-                    .addColumn(new Column("Offset", 
ScalarType.createStringType()))
                     .build();
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 5875699fb0f..aa1ecc02a9f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -68,8 +68,7 @@ public class InsertTask extends AbstractTask {
             new Column("TrackingUrl", ScalarType.createStringType()),
             new Column("LoadStatistic", ScalarType.createStringType()),
             new Column("User", ScalarType.createStringType()),
-            new Column("FirstErrorMsg", ScalarType.createStringType()),
-            new Column("Offset", ScalarType.createStringType()));
+            new Column("FirstErrorMsg", ScalarType.createStringType()));
 
     public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
 
@@ -277,7 +276,6 @@ public class InsertTask extends AbstractTask {
             trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         }
         trow.addToColumnValue(new TCell().setStringVal(firstErrorMsg == null ? 
"" : firstErrorMsg));
-        trow.addToColumnValue(new TCell().setStringVal(""));
         return trow;
     }
 
@@ -299,7 +297,6 @@ public class InsertTask extends AbstractTask {
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
         trow.addToColumnValue(new TCell().setStringVal(""));
-        trow.addToColumnValue(new TCell().setStringVal(""));
         return trow;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index b32ecdcb94e..f70cbc37fca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -63,6 +63,7 @@ import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -203,7 +204,16 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public boolean isReadyForScheduling(Map<Object, Object> taskContext) {
-        return CollectionUtils.isEmpty(getRunningTasks());
+        return CollectionUtils.isEmpty(getRunningTasks()) && !isFinalStatus();
+    }
+
+    @Override
+    public boolean isJobRunning() {
+        return !isFinalStatus();
+    }
+
+    private boolean isFinalStatus() {
+        return getJobStatus().equals(JobStatus.STOPPED) || 
getJobStatus().equals(JobStatus.FINISHED);
     }
 
     @Override
@@ -354,14 +364,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new TCell().setStringVal(properties != null
                 ? GsonUtils.GSON.toJson(properties) : 
FeConstants.null_string));
 
-        if (offsetProvider != null && offsetProvider.getConsumedOffset() != 
null) {
-            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getConsumedOffset()));
+        if (offsetProvider != null && 
StringUtils.isNotEmpty(offsetProvider.getShowCurrentOffset())) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getShowCurrentOffset()));
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         }
 
-        if (offsetProvider != null && offsetProvider.getMaxOffset() != null) {
-            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getMaxOffset()));
+        if (offsetProvider != null && 
StringUtils.isNotEmpty(offsetProvider.getShowMaxOffset())) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getShowMaxOffset()));
         } else {
             trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index a1c8bce649d..0cd023b7d1d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -154,8 +154,6 @@ public class StreamingJobSchedulerTask extends AbstractTask 
{
             trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getUserIdentity().getQualifiedUser()));
         }
         trow.addToColumnValue(new TCell().setStringVal(""));
-        trow.addToColumnValue(new 
TCell().setStringVal(runningTask.getRunningOffset() == null ? 
FeConstants.null_string
-                : runningTask.getRunningOffset().toString()));
         return trow;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index a27b66812c2..d2cf4e62053 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -40,16 +40,16 @@ public interface SourceOffsetProvider {
     Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String> 
properties);
 
     /**
-     * Get consumered offset to show
+     * Get current offset to show
      * @return
      */
-    String getConsumedOffset();
+    String getShowCurrentOffset();
 
     /**
-     * Get remote datasource max offset
+     * Get remote datasource max offset to show
      * @return
      */
-    String getMaxOffset();
+    String getShowMaxOffset();
 
     /**
      * Rewrite the TVF parameters in the SQL based on the current offset.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
index b152660873a..6257eed8c70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3Offset.java
@@ -28,8 +28,6 @@ import org.apache.commons.lang3.StringUtils;
 @Getter
 @Setter
 public class S3Offset implements Offset {
-    // path/1.csv
-    String startFile;
     @SerializedName("endFile")
     String endFile;
     // s3://bucket/path/{1.csv,2.csv}
@@ -47,6 +45,6 @@ public class S3Offset implements Offset {
 
     @Override
     public String toString() {
-        return "{ \"startFile\": \"" + startFile + "\", \"endFile\": \"" + 
endFile + "\" }";
+        return "{\"endFile\": \"" + endFile + "\" }";
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index e6a5cf809a5..74743d09399 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -33,6 +33,7 @@ import 
org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.persist.gson.GsonUtils;
 
 import com.google.common.collect.Maps;
+import com.google.gson.Gson;
 import lombok.extern.log4j.Log4j2;
 import org.apache.commons.lang3.StringUtils;
 
@@ -72,7 +73,6 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
                 String bucket = globListResult.getBucket();
                 String prefix = globListResult.getPrefix();
 
-                offset.setStartFile(startFile);
                 String bucketBase = "s3://" + bucket + "/";
                 // Get the path of the last directory
                 int lastSlash = prefix.lastIndexOf('/');
@@ -109,16 +109,18 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public String getConsumedOffset() {
+    public String getShowCurrentOffset() {
         if (currentOffset != null) {
-            return currentOffset.getEndFile();
+            return currentOffset.toSerializedJson();
         }
         return null;
     }
 
     @Override
-    public String getMaxOffset() {
-        return maxEndFile;
+    public String getShowMaxOffset() {
+        Map<String, String> res = new HashMap<>();
+        res.put("endFile", maxEndFile);
+        return new Gson().toJson(res);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index ab03cce7c91..dde5f891efd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -24,7 +24,6 @@ import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.job.base.AbstractJob;
 import org.apache.doris.job.base.JobExecuteType;
 import org.apache.doris.job.common.JobStatus;
-import org.apache.doris.job.common.JobUtils;
 import org.apache.doris.job.common.TaskType;
 import org.apache.doris.job.disruptor.TaskDisruptor;
 import org.apache.doris.job.exception.JobException;
@@ -110,7 +109,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
     }
 
     public void scheduleOneJob(T job) throws JobException {
-        if (!JobUtils.checkNeedSchedule(job)) {
+        if (!job.isJobRunning()) {
             return;
         }
         // not-schedule task
@@ -145,7 +144,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
     }
 
     public void cycleTimerJobScheduler(T job) {
-        if (!JobUtils.checkNeedSchedule(job)) {
+        if (!job.isJobRunning()) {
             return;
         }
         if 
(!JobExecuteType.RECURRING.equals(job.getJobConfig().getExecuteType())) {
@@ -226,7 +225,7 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
                 clearEndJob(job);
                 continue;
             }
-            if (JobUtils.checkNeedSchedule(job) && 
job.getJobConfig().checkIsTimerJob()) {
+            if (job.isJobRunning() && job.getJobConfig().checkIsTimerJob()) {
                 cycleTimerJobScheduler(job, lastTimeWindowMs);
             }
         }
diff --git 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 54fc5faa2e8..6865cedd842 100644
--- 
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++ 
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -97,10 +97,10 @@ suite("test_streaming_insert_job") {
 
 
     def jobOffset = sql """
-        select ConsumedOffset, MaxOffset from jobs("type"="insert") where 
Name='${jobName}'
+        select currentOffset, endoffset from jobs("type"="insert") where 
Name='${jobName}'
     """
-    assert jobOffset.get(0).get(0) == "regression/load/data/example_1.csv"
-    assert jobOffset.get(0).get(1) == "regression/load/data/example_1.csv"
+    assert jobOffset.get(0).get(0) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
+    assert jobOffset.get(0).get(1) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
 
     // alter streaming job
     sql """
@@ -123,21 +123,21 @@ suite("test_streaming_insert_job") {
     """
 
     def alterJobProperties = sql """
-        select status,properties,ConsumedOffset from jobs("type"="insert") 
where Name='${jobName}'
+        select status,properties,currentOffset from jobs("type"="insert") 
where Name='${jobName}'
     """
     assert alterJobProperties.get(0).get(0) == "PAUSED"
     assert alterJobProperties.get(0).get(1) == 
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
-    assert alterJobProperties.get(0).get(2) == 
"regression/load/data/example_1.csv"
+    assert alterJobProperties.get(0).get(2) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
 
     sql """
         RESUME JOB where jobname =  '${jobName}'
     """
     def resumeJobStatus = sql """
-        select status,properties,ConsumedOffset from jobs("type"="insert") 
where Name='${jobName}'
+        select status,properties,currentOffset from jobs("type"="insert") 
where Name='${jobName}'
     """
     assert resumeJobStatus.get(0).get(0) == "RUNNING" || 
resumeJobStatus.get(0).get(0) == "PENDING"
     assert resumeJobStatus.get(0).get(1) == 
"{\"s3.max_batch_files\":\"1\",\"session.insert_max_filter_ratio\":\"0.5\"}"
-    assert resumeJobStatus.get(0).get(2) == 
"regression/load/data/example_1.csv"
+    assert resumeJobStatus.get(0).get(2) == 
"{\"endFile\":\"regression/load/data/example_1.csv\"}";
 
     Awaitility.await().atMost(60, SECONDS)
             .pollInterval(1, SECONDS).until(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to