This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/dev-offline-sync by this push:
     new 9c714530a1 [INLONG-10561][Manager] Support configrations for bounded 
source (#10571)
9c714530a1 is described below

commit 9c714530a1cfcd3f5c21be7a472d18db78459176
Author: AloysZhang <[email protected]>
AuthorDate: Mon Jul 8 10:07:42 2024 +0800

    [INLONG-10561][Manager] Support configrations for bounded source (#10571)
    
    Co-authored-by: Aloys Zhang <[email protected]>
---
 .../client/api/inner/client/InlongGroupClient.java |  5 +-
 .../manager/client/api/service/InlongGroupApi.java |  5 +-
 .../manager/common/consts/InlongConstants.java     |  4 ++
 .../inlong/manager/common/enums/ErrorCodeEnum.java |  4 ++
 .../inlong/manager/plugin/flink/FlinkService.java  | 13 +++++-
 .../inlong/manager/plugin/flink/dto/FlinkInfo.java |  6 +++
 .../plugin/offline/FlinkOfflineJobOperator.java    |  6 ++-
 .../inlong/manager/plugin/util/FlinkUtils.java     | 17 ++++---
 .../pojo/schedule/OfflineJobSubmitRequest.java}    | 31 ++++++++-----
 .../schedule/quartz/QuartzOfflineSyncJob.java      | 45 +++++++++++++-----
 .../manager/schedule/util/ScheduleUtils.java       | 13 ++++--
 .../inlong/manager/schedule/BaseScheduleTest.java  |  5 ++
 .../manager/service/group/InlongGroupService.java  |  7 +--
 .../service/group/InlongGroupServiceImpl.java      | 42 ++++++++++++++++-
 .../manager/service/schedule/ScheduleOperator.java |  4 +-
 .../service/schedule/ScheduleOperatorImpl.java     |  5 +-
 .../service/source/bounded/BoundedSourceType.java  | 54 ++++++++++++++++++++++
 .../web/controller/InlongGroupController.java      | 12 +++--
 .../workflow/processor/OfflineJobOperator.java     |  4 +-
 19 files changed, 226 insertions(+), 56 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index fcc907396e..d053ea37a5 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -36,6 +36,7 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
 import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
 import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
@@ -320,8 +321,8 @@ public class InlongGroupClient {
         return response.getData();
     }
 
-    public Boolean submitOfflineJob(String groupId) {
-        Response<Boolean> responseBody = 
ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(groupId));
+    public Boolean submitOfflineJob(OfflineJobSubmitRequest request) {
+        Response<Boolean> responseBody = 
ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(request));
         ClientUtils.assertRespSuccess(responseBody);
         return responseBody.getData();
     }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
index 930a3a8144..1761193644 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
@@ -26,6 +26,7 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 
 import retrofit2.Call;
@@ -99,6 +100,6 @@ public interface InlongGroupApi {
     @GET("group/switch/finish/{groupId}")
     Call<Response<Boolean>> finishTagSwitch(@Path("groupId") String groupId);
 
-    @POST("group/submitOfflineJob/{groupId}")
-    Call<Response<Boolean>> submitOfflineJob(@Path("groupId") String groupId);
+    @POST("group/submitOfflineJob")
+    Call<Response<Boolean>> submitOfflineJob(@Body OfflineJobSubmitRequest 
request);
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 4abcc0f359..d9957e81f5 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -93,6 +93,10 @@ public class InlongConstants {
     public static final String RUNTIME_EXECUTION_MODE_STREAM = "stream";
     public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";
 
+    public static final String BOUNDARY_TYPE = "BOUNDARY_TYPE";
+    public static final String LOWER_BOUNDARY = "LOWER_BOUNDARY";
+    public static final String UPPER_BOUNDARY = "UPPER_BOUNDARY";
+
     public static final Integer DISABLE_ZK = 0;
     public static final Integer ENABLE_ZK = 1;
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 2a82325ad9..9364fda8e7 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -130,6 +130,10 @@ public enum ErrorCodeEnum {
     SCHEDULE_ENGINE_NOT_SUPPORTED(1702, "Schedule engine type not supported"),
     SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED(1703, "Schedule status transition 
is not allowed"),
 
+    BOUNDED_SOURCE_TYPE_NOT_SUPPORTED(1801, "Bounded source type %s not 
supported"),
+    BOUNDARY_TYPE_NOT_SUPPORTED(1802, "Boundary type %s not supported"),
+    BOUNDARIES_NOT_FOUND(1803, "Boundaries not found"),
+
     WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
     WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no 
operation authority"),
     WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"),
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 9188c6b73f..f3809ce8ae 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.plugin.flink;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
 import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -258,8 +259,16 @@ public class FlinkService {
         list.add(flinkInfo.getLocalConfPath());
         list.add("-checkpoint.interval");
         list.add("60000");
-        list.add("-runtime.execution.mode");
-        list.add(flinkInfo.getRuntimeExecutionMode());
+        if 
(InlongConstants.RUNTIME_EXECUTION_MODE_BATCH.equalsIgnoreCase(flinkInfo.getRuntimeExecutionMode()))
 {
+            list.add("-runtime.execution.mode");
+            list.add(flinkInfo.getRuntimeExecutionMode());
+            list.add("-source.boundary.type");
+            list.add(flinkInfo.getBoundaryType());
+            list.add("-source.lower.boundary");
+            list.add(flinkInfo.getLowerBoundary());
+            list.add("-source.upper.boundary");
+            list.add(flinkInfo.getUpperBoundary());
+        }
         return list.toArray(new String[0]);
     }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index 4c3c75f855..e8b924f138 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -54,4 +54,10 @@ public class FlinkInfo {
     private String exceptionMsg;
 
     private String runtimeExecutionMode;
+
+    private String boundaryType;
+
+    private String lowerBoundary;
+
+    private String upperBoundary;
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
index 1c93e7636a..9a6eca45ce 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.plugin.offline;
 
+import org.apache.inlong.common.bounded.Boundaries;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
 
@@ -30,7 +31,8 @@ import static 
org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;
 public class FlinkOfflineJobOperator implements OfflineJobOperator {
 
     @Override
-    public void submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList) throws Exception {
-        submitFlinkJobs(groupId, streamInfoList, true);
+    public void submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList, Boundaries boundaries)
+            throws Exception {
+        submitFlinkJobs(groupId, streamInfoList, true, boundaries);
     }
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index fd0c2c8bf2..1110ef45d8 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.plugin.util;
 
+import org.apache.inlong.common.bounded.Boundaries;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -222,11 +223,11 @@ public class FlinkUtils {
 
     public static ListenerResult submitFlinkJobs(String groupId, 
List<InlongStreamInfo> streamInfoList)
             throws Exception {
-        return submitFlinkJobs(groupId, streamInfoList, false);
+        return submitFlinkJobs(groupId, streamInfoList, false, null);
     }
 
     public static ListenerResult submitFlinkJobs(String groupId, 
List<InlongStreamInfo> streamInfoList,
-            boolean isBatchJob) throws Exception {
+            boolean isBatchJob, Boundaries boundaries) throws Exception {
         int sinkCount = streamInfoList.stream()
                 .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
                 .reduce(0, Integer::sum);
@@ -240,7 +241,8 @@ public class FlinkUtils {
         List<ListenerResult> listenerResults = new ArrayList<>();
         for (InlongStreamInfo streamInfo : streamInfoList) {
             listenerResults.add(
-                    FlinkUtils.submitFlinkJob(streamInfo, 
FlinkUtils.genFlinkJobName(streamInfo), isBatchJob));
+                    FlinkUtils.submitFlinkJob(
+                            streamInfo, 
FlinkUtils.genFlinkJobName(streamInfo), isBatchJob, boundaries));
         }
 
         // only one stream in group for now
@@ -254,11 +256,11 @@ public class FlinkUtils {
     }
 
     public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, 
String jobName) throws Exception {
-        return submitFlinkJob(streamInfo, jobName, false);
+        return submitFlinkJob(streamInfo, jobName, false, null);
     }
 
-    public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, 
String jobName, boolean isBatchJob)
-            throws Exception {
+    public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, 
String jobName,
+            boolean isBatchJob, Boundaries boundaries) throws Exception {
         List<StreamSink> sinkList = streamInfo.getSinkList();
         List<String> sinkTypes = 
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
         if (CollectionUtils.isEmpty(sinkList) || 
!SinkType.containSortFlinkSink(sinkTypes)) {
@@ -298,6 +300,9 @@ public class FlinkUtils {
         
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
         if (isBatchJob) {
             flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_BATCH);
+            flinkInfo.setBoundaryType(boundaries.getBoundaryType().getType());
+            flinkInfo.setLowerBoundary(boundaries.getLowerBound());
+            flinkInfo.setUpperBoundary(boundaries.getUpperBound());
         } else {
             flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_STREAM);
         }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java
similarity index 56%
copy from 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java
index 1c93e7636a..6e9dcc91e8 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java
@@ -15,22 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.plugin.offline;
+package org.apache.inlong.manager.pojo.schedule;
 
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
 
-import lombok.NoArgsConstructor;
+import javax.validation.constraints.NotNull;
 
-import java.util.List;
+@Data
+public class OfflineJobSubmitRequest {
 
-import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;
+    @ApiModelProperty("Inlong Group ID")
+    @NotNull
+    private String groupId;
 
-@NoArgsConstructor
-public class FlinkOfflineJobOperator implements OfflineJobOperator {
+    @ApiModelProperty("Source boundary type, TIME and OFFSET are supported")
+    @NotNull
+    private String boundaryType;
 
-    @Override
-    public void submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList) throws Exception {
-        submitFlinkJobs(groupId, streamInfoList, true);
-    }
+    @ApiModelProperty("The lower bound for bounded source")
+    @NotNull
+    private String lowerBoundary;
+
+    @ApiModelProperty("The upper bound for bounded source")
+    @NotNull
+    private String upperBoundary;
 }
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
index 2b66766474..6e369c4c41 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
@@ -17,12 +17,14 @@
 
 package org.apache.inlong.manager.schedule.quartz;
 
+import org.apache.inlong.common.bounded.BoundaryType;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
 import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -35,10 +37,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
+import static org.apache.inlong.manager.schedule.util.ScheduleUtils.END_TIME;
 import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_HOST;
 import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_PORT;
-import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_ID;
-import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_KEY;
+import static org.apache.inlong.manager.schedule.util.ScheduleUtils.PASSWORD;
+import static org.apache.inlong.manager.schedule.util.ScheduleUtils.USERNAME;
 
 @Data
 @NoArgsConstructor
@@ -49,31 +52,49 @@ public class QuartzOfflineSyncJob implements Job {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(QuartzOfflineSyncJob.class);
 
     private volatile InlongGroupClient groupClient;
+    private long endTime;
 
     @Override
     public void execute(JobExecutionContext context) throws 
JobExecutionException {
         LOGGER.info("QuartzOfflineSyncJob run once");
         JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
         initGroupClientIfNeeded(jobDataMap);
+
         String inlongGroupId = context.getJobDetail().getKey().getName();
-        LOGGER.info("Starting submit offline job for group {}", inlongGroupId);
-        if (groupClient.submitOfflineJob(inlongGroupId)) {
-            LOGGER.info("Successfully submitting offline job for group {}", 
inlongGroupId);
-        } else {
-            LOGGER.warn("Failed to submit offline job for group {}", 
inlongGroupId);
+        long lowerBoundary = context.getScheduledFireTime().getTime();
+        long upperBoundary = context.getNextFireTime() == null ? endTime : 
context.getNextFireTime().getTime();
+
+        OfflineJobSubmitRequest request = new OfflineJobSubmitRequest();
+        request.setGroupId(inlongGroupId);
+        request.setBoundaryType(BoundaryType.TIME.getType());
+        request.setLowerBoundary(String.valueOf(lowerBoundary));
+        request.setUpperBoundary(String.valueOf(upperBoundary));
+        LOGGER.info("Starting submit offline job for group: {}, with lower 
boundary: {} and upper boundary: {}",
+                inlongGroupId, lowerBoundary, upperBoundary);
+
+        try {
+            if (groupClient.submitOfflineJob(request)) {
+                LOGGER.info("Successfully submitting offline job for group 
{}", inlongGroupId);
+            } else {
+                LOGGER.warn("Failed to submit offline job for group {}", 
inlongGroupId);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Exception to submit offline job for group {}, error 
msg: {}", inlongGroupId, e.getMessage());
         }
+
     }
 
     private void initGroupClientIfNeeded(JobDataMap jobDataMap) {
         if (groupClient == null) {
             String host = (String) jobDataMap.get(MANAGER_HOST);
             int port = (int) jobDataMap.get(MANAGER_PORT);
-            String secreteId = (String) jobDataMap.get(SECRETE_ID);
-            String secreteKey = (String) jobDataMap.get(SECRETE_KEY);
-            LOGGER.info("Initializing Inlong group client, with host: {}, 
port: {}, userName : {}",
-                    host, port, secreteId);
+            String username = (String) jobDataMap.get(USERNAME);
+            String password = (String) jobDataMap.get(PASSWORD);
+            endTime = (long) jobDataMap.get(END_TIME);
+            LOGGER.info("Initializing Inlong group client, with host: {}, 
port: {}, userName : {}, endTime: {}",
+                    host, port, username, endTime);
             ClientConfiguration configuration = new ClientConfiguration();
-            configuration.setAuthentication(new 
DefaultAuthentication(secreteId, secreteKey));
+            configuration.setAuthentication(new 
DefaultAuthentication(username, password));
             String serviceUrl = host + ":" + port;
             InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl, 
configuration);
             ClientFactory clientFactory = 
ClientUtils.getClientFactory(inlongClient.getConfiguration());
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
index 50c5d9505f..4a974f299e 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
@@ -48,17 +48,20 @@ public class ScheduleUtils {
 
     public static final String MANAGER_HOST = "HOST";
     public static final String MANAGER_PORT = "PORT";
-    public static final String SECRETE_ID = "SECRETE_ID";
-    public static final String SECRETE_KEY = "SECRETE_KEY";
+    public static final String USERNAME = "USERNAME";
+    public static final String PASSWORD = "PASSWORD";
+    public static final String END_TIME = "END_TIME";
 
     public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo, 
Class<? extends Job> clz,
-            String host, Integer port, String secreteId, String secreteKey) {
+            String host, Integer port, String username, String password) {
+
         return JobBuilder.newJob(clz)
                 .withIdentity(scheduleInfo.getInlongGroupId())
                 .usingJobData(MANAGER_HOST, host)
                 .usingJobData(MANAGER_PORT, port)
-                .usingJobData(SECRETE_ID, secreteId)
-                .usingJobData(SECRETE_KEY, secreteKey)
+                .usingJobData(USERNAME, username)
+                .usingJobData(PASSWORD, password)
+                .usingJobData(END_TIME, scheduleInfo.getEndTime().getTime())
                 .build();
     }
 
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
index 1f0329a318..af54fbcc55 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
 
 import java.sql.Timestamp;
 
+import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_ROUND;
 import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND;
 
 public class BaseScheduleTest {
@@ -42,6 +43,10 @@ public class BaseScheduleTest {
         return genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(), 
DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
     }
 
+    public ScheduleInfo genOneroundScheduleInfo() {
+        return genNormalScheduleInfo(GROUP_ID, ONE_ROUND.getUnit(), 
DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
+    }
+
     public ScheduleInfo genNormalScheduleInfo(String groupId, String 
scheduleUnit, int scheduleInterval,
             long timeSpanInMs) {
         ScheduleInfo scheduleInfo = new ScheduleInfo();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index c6643c3259..f8a943c7b4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -29,6 +29,7 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 
 import javax.validation.Valid;
@@ -219,10 +220,10 @@ public interface InlongGroupService {
     List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag);
 
     /**
-     * Submitting offline job for the given group.
-     * @param groupId the inlong group to submit offline job
+     * Submitting offline job.
+     * @param request request to submit offline sync job
      *
      * */
-    Boolean submitOfflineJob(String groupId);
+    Boolean submitOfflineJob(OfflineJobSubmitRequest request);
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index d2f3c0c948..7ea4d001ee 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.group;
 
+import org.apache.inlong.common.bounded.Boundaries;
+import org.apache.inlong.common.bounded.BoundaryType;
 import org.apache.inlong.manager.common.auth.Authentication.AuthType;
 import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
 import org.apache.inlong.manager.common.consts.InlongConstants;
@@ -59,6 +61,7 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -78,6 +81,7 @@ import 
org.apache.inlong.manager.service.schedule.ScheduleOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.source.SourceOperatorFactory;
 import org.apache.inlong.manager.service.source.StreamSourceOperator;
+import org.apache.inlong.manager.service.source.bounded.BoundedSourceType;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.tenant.InlongTenantService;
 import org.apache.inlong.manager.service.user.InlongRoleService;
@@ -932,8 +936,9 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
     }
 
     @Override
-    public Boolean submitOfflineJob(String groupId) {
+    public Boolean submitOfflineJob(OfflineJobSubmitRequest request) {
         // 1. get stream info list
+        String groupId = request.getGroupId();
         InlongGroupInfo groupInfo = get(groupId);
         if (groupInfo == null) {
             String msg = String.format("InLong group not found for group=%s", 
groupId);
@@ -946,7 +951,40 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
             LOGGER.warn("No stream info found for group {}, skip submit 
offline job", groupId);
             return false;
         }
-        return scheduleOperator.submitOfflineJob(groupId, streamInfoList);
+
+        // check if source type is bounded source
+        streamInfoList.forEach(this::checkBoundedSource);
+
+        // get the source boundaries
+        checkSourceBoundaryType(request.getBoundaryType());
+        BoundaryType boundaryType = 
BoundaryType.getInstance(request.getBoundaryType());
+        if (boundaryType == null) {
+            throw new 
BusinessException(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED,
+                    
String.format(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED.getMessage(), 
request.getBoundaryType()));
+        }
+        Boundaries boundaries = new Boundaries(request.getLowerBoundary(), 
request.getUpperBoundary(), boundaryType);
+
+        LOGGER.info("Check bounded source success, start to submitting offline 
job for group {}", groupId);
+
+        return scheduleOperator.submitOfflineJob(groupId, streamInfoList, 
boundaries);
+    }
+
+    private void checkBoundedSource(InlongStreamInfo streamInfo) {
+        streamInfo.getSourceList().forEach(stream -> {
+            if (!BoundedSourceType.isBoundedSource(stream.getSourceType())) {
+                throw new 
BusinessException(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED,
+                        
String.format(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED.getMessage(),
+                                stream.getSourceType()));
+            }
+        });
+    }
+
+    private void checkSourceBoundaryType(String sourceBoundaryType) {
+        if (!BoundaryType.isSupportBoundaryType(sourceBoundaryType)) {
+            throw new 
BusinessException(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED,
+                    
String.format(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED.getMessage(),
+                            sourceBoundaryType));
+        }
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
index 653523294e..5b1e3c2df6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.schedule;
 
+import org.apache.inlong.common.bounded.Boundaries;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -100,7 +101,8 @@ public interface ScheduleOperator {
      * Start offline sync job when the schedule instance callback.
      * @param groupId groupId to start offline job
      * @param streamInfoList stream list to start offline job
+     * @param boundaries source boundaries for bounded source
      * @Return whether succeed
      * */
-    Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList);
+    Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList, Boundaries boundaries);
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
index 268109e1a2..61f847f244 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.schedule;
 
+import org.apache.inlong.common.bounded.Boundaries;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -178,12 +179,12 @@ public class ScheduleOperatorImpl implements 
ScheduleOperator {
     }
 
     @Override
-    public Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList) {
+    public Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList, Boundaries boundaries) {
         if (offlineJobOperator == null) {
             offlineJobOperator = 
OfflineJobOperatorFactory.getOfflineJobOperator();
         }
         try {
-            offlineJobOperator.submitOfflineJob(groupId, streamInfoList);
+            offlineJobOperator.submitOfflineJob(groupId, streamInfoList, 
boundaries);
             LOGGER.info("Submit offline job for group {} and stream list {} 
success.", groupId,
                     
streamInfoList.stream().map(InlongStreamInfo::getName).collect(Collectors.toList()));
         } catch (Exception e) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java
new file mode 100644
index 0000000000..df61fd2d51
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.bounded;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import lombok.Getter;
+
+@Getter
+public enum BoundedSourceType {
+
+    PULSAR("pulsar");
+
+    private final String sourceType;
+
+    BoundedSourceType(String name) {
+        this.sourceType = name;
+    }
+
+    public static BoundedSourceType getInstance(String name) {
+        for (BoundedSourceType source : values()) {
+            if (source.getSourceType().equalsIgnoreCase(name)) {
+                return source;
+            }
+        }
+        throw new 
BusinessException(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED,
+                
String.format(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED.getMessage(), 
name));
+    }
+
+    public static boolean isBoundedSource(String name) {
+        for (BoundedSourceType source : values()) {
+            if (source.getSourceType().equalsIgnoreCase(name)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 3a9389c297..caa4aa6c83 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -33,6 +33,7 @@ import 
org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
 import org.apache.inlong.manager.pojo.user.LoginUserUtils;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.service.group.InlongGroupProcessService;
@@ -43,6 +44,8 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -65,6 +68,7 @@ import java.util.Map;
 @Api(tags = "Inlong-Group-API")
 public class InlongGroupController {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongGroupController.class);
     @Autowired
     private InlongGroupService groupService;
     @Autowired
@@ -252,10 +256,10 @@ public class InlongGroupController {
         return Response.success(groupService.finishTagSwitch(groupId));
     }
 
-    @RequestMapping(value = "/group/submitOfflineJob/{groupId}", method = 
RequestMethod.POST)
+    @RequestMapping(value = "/group/submitOfflineJob", method = 
RequestMethod.POST)
     @ApiOperation(value = "Submitting inlong offline job process")
-    @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class)
-    public Response<Boolean> submitOfflineJob(@PathVariable String groupId) {
-        return Response.success(groupService.submitOfflineJob(groupId));
+    public Response<Boolean> submitOfflineJob(@RequestBody 
OfflineJobSubmitRequest request) {
+        LOGGER.info("Received offline job submit request {}", request);
+        return Response.success(groupService.submitOfflineJob(request));
     }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
index f6d395dc7e..d6bd3bdf38 100644
--- 
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
@@ -17,11 +17,13 @@
 
 package org.apache.inlong.manager.workflow.processor;
 
+import org.apache.inlong.common.bounded.Boundaries;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 
 import java.util.List;
 
 public interface OfflineJobOperator {
 
-    void submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList) throws Exception;
+    void submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList, Boundaries boundaries)
+            throws Exception;
 }


Reply via email to