This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/dev-offline-sync by this push:
     new 8819bfc166 [INLONG-10459][Manager] Support schedule instance callback 
to submit Flink batch job (#10510)
8819bfc166 is described below

commit 8819bfc1666902c19f46ab995eba0ab64990fa4f
Author: AloysZhang <[email protected]>
AuthorDate: Wed Jun 26 13:11:11 2024 +0800

    [INLONG-10459][Manager] Support schedule instance callback to submit Flink 
batch job (#10510)
    
    * [INLONG-10459][Manager] Support schedule instance callback to submit 
Flink batch job
---
 .../plugin/listener/StartupSortListener.java       | 38 ++--------------
 .../plugin/offline/FlinkOfflineJobOperator.java}   | 30 +++++--------
 .../inlong/manager/plugin/util/FlinkUtils.java     | 32 ++++++++++++++
 .../inlong/manager/pojo/schedule/ScheduleInfo.java | 16 +++----
 .../manager/pojo/schedule/ScheduleInfoRequest.java | 16 +++----
 .../manager/schedule/NoopScheduleClient.java       |  2 +-
 .../manager/service/group/InlongGroupService.java  |  7 +++
 .../service/group/InlongGroupServiceImpl.java      | 18 ++++++++
 .../service/listener/sort/SortConfigListener.java  | 10 +----
 .../schedule/OfflineJobOperatorFactory.java        | 51 ++++++++++++++++++++++
 .../manager/service/schedule/ScheduleOperator.java | 11 +++++
 .../service/schedule/ScheduleOperatorImpl.java     | 37 +++++++++++++---
 .../web/controller/InlongGroupController.java      |  6 +++
 .../src/main/resources/application.properties      |  6 +--
 .../workflow/processor/OfflineJobOperator.java}    | 29 +++---------
 15 files changed, 194 insertions(+), 115 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index b0624fbe66..5d1dd82578 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -20,8 +20,6 @@ package org.apache.inlong.manager.plugin.listener;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.plugin.util.FlinkUtils;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -32,9 +30,9 @@ import 
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
+
+import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;
 
 /**
  * Listener of startup sort.
@@ -62,8 +60,7 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         log.info("add startup group listener for groupId [{}]", groupId);
-        return 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())
-                || 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
+        return 
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
     }
 
     @Override
@@ -77,36 +74,9 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         GroupResourceProcessForm groupResourceForm = 
(GroupResourceProcessForm) processForm;
-        InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
-        // do not build sort config if the group mode is offline
-        if 
(InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
-            log.info("no need to launching sort job for groupId={} as the mode 
is offline",
-                    groupId);
-            return ListenerResult.success();
-        }
         List<InlongStreamInfo> streamInfos = 
groupResourceForm.getStreamInfos();
-        int sinkCount = streamInfos.stream()
-                .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
-                .reduce(0, Integer::sum);
-        if (sinkCount == 0) {
-            log.warn("not any sink configured for group {}, skip launching 
sort job", groupId);
-            return ListenerResult.success();
-        }
 
-        List<ListenerResult> listenerResults = new ArrayList<>();
-        for (InlongStreamInfo streamInfo : streamInfos) {
-            listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo,
-                    FlinkUtils.genFlinkJobName(processForm, streamInfo)));
-        }
-
-        // only one stream in group for now
-        // we can return the list of ListenerResult if support multi-stream in 
the future
-        List<ListenerResult> failedStreams = listenerResults.stream()
-                .filter(t -> !t.isSuccess()).collect(Collectors.toList());
-        if (failedStreams.isEmpty()) {
-            return ListenerResult.success();
-        }
-        return ListenerResult.fail(failedStreams.get(0).getRemark());
+        return submitFlinkJobs(groupId, streamInfos);
     }
 
     /**
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
similarity index 56%
copy from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
copy to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
index a122235de3..09740c53f3 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
@@ -15,32 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.plugin.offline;
 
-import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
 
-import org.springframework.stereotype.Service;
+import lombok.NoArgsConstructor;
 
-@Service
-public class NoopScheduleClient implements ScheduleEngineClient {
+import java.util.List;
 
-    @Override
-    public boolean accept(String engineType) {
-        return ScheduleEngineType.NONE.getType().equals(engineType);
-    }
+import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;
 
-    @Override
-    public boolean register(ScheduleInfo scheduleInfo) {
-        return true;
-    }
-
-    @Override
-    public boolean unregister(String groupId) {
-        return true;
-    }
+@NoArgsConstructor
+public class FlinkOfflineJobOperator implements OfflineJobOperator {
 
     @Override
-    public boolean update(ScheduleInfo scheduleInfo) {
-        return true;
+    public void submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList) throws Exception {
+        submitFlinkJobs(groupId, streamInfoList);
     }
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index d4c7863371..82a0d628c1 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -218,6 +218,33 @@ public class FlinkUtils {
         return flinkConfig;
     }
 
+    public static ListenerResult submitFlinkJobs(String groupId, 
List<InlongStreamInfo> streamInfoList)
+            throws Exception {
+        int sinkCount = streamInfoList.stream()
+                .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
+                .reduce(0, Integer::sum);
+        if (sinkCount == 0) {
+            log.warn("Not any sink configured for group {} and stream list {}, 
skip launching sort job", groupId,
+                    streamInfoList.stream()
+                            .map(s -> s.getInlongGroupId() + ":" + 
s.getName()).collect(Collectors.toList()));
+            return ListenerResult.success();
+        }
+
+        List<ListenerResult> listenerResults = new ArrayList<>();
+        for (InlongStreamInfo streamInfo : streamInfoList) {
+            listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo, 
FlinkUtils.genFlinkJobName(streamInfo)));
+        }
+
+        // only one stream in group for now
+        // we can return the list of ListenerResult if support multi-stream in 
the future
+        List<ListenerResult> failedStreams = listenerResults.stream()
+                .filter(t -> !t.isSuccess()).collect(Collectors.toList());
+        if (failedStreams.isEmpty()) {
+            return ListenerResult.success();
+        }
+        return ListenerResult.fail(failedStreams.get(0).getRemark());
+    }
+
     public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo, 
String jobName) throws Exception {
         List<StreamSink> sinkList = streamInfo.getSinkList();
         List<String> sinkTypes = 
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
@@ -295,4 +322,9 @@ public class FlinkUtils {
         return Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + 
InlongConstants.HYPHEN
                 + streamInfo.getInlongStreamId();
     }
+
+    public static String genFlinkJobName(InlongStreamInfo streamInfo) {
+        return String.format(Constants.SORT_JOB_NAME_TEMPLATE, 
streamInfo.getInlongGroupId()) + InlongConstants.HYPHEN
+                + streamInfo.getInlongStreamId();
+    }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
index 24f2e6196e..0bc8d2e6db 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -89,16 +89,16 @@ public class ScheduleInfo {
             return false;
         }
         ScheduleInfo that = (ScheduleInfo) o;
-        return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, 
that.inlongGroupId)
-                && Objects.equals(scheduleType, that.scheduleType) && 
Objects.equals(scheduleUnit,
-                        that.scheduleUnit)
+        return Objects.equals(inlongGroupId, that.inlongGroupId)
+                && Objects.equals(scheduleType, that.scheduleType)
+                && Objects.equals(scheduleUnit, that.scheduleUnit)
                 && Objects.equals(scheduleInterval, that.scheduleInterval)
-                && Objects.equals(startTime, that.startTime) && 
Objects.equals(endTime, that.endTime)
-                && Objects.equals(delayTime, that.delayTime) && 
Objects.equals(selfDepend,
-                        that.selfDepend)
+                && Objects.equals(startTime, that.startTime)
+                && Objects.equals(endTime, that.endTime)
+                && Objects.equals(delayTime, that.delayTime)
+                && Objects.equals(selfDepend, that.selfDepend)
                 && Objects.equals(taskParallelism, that.taskParallelism)
-                && Objects.equals(crontabExpression, that.crontabExpression) 
&& Objects.equals(version,
-                        that.version);
+                && Objects.equals(crontabExpression, that.crontabExpression);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
index a324cee7f5..7e81f7533f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -83,16 +83,16 @@ public class ScheduleInfoRequest {
             return false;
         }
         ScheduleInfoRequest that = (ScheduleInfoRequest) o;
-        return Objects.equals(id, that.id) && Objects.equals(inlongGroupId, 
that.inlongGroupId)
-                && Objects.equals(scheduleType, that.scheduleType) && 
Objects.equals(scheduleUnit,
-                        that.scheduleUnit)
+        return Objects.equals(inlongGroupId, that.inlongGroupId)
+                && Objects.equals(scheduleType, that.scheduleType)
+                && Objects.equals(scheduleUnit, that.scheduleUnit)
                 && Objects.equals(scheduleInterval, that.scheduleInterval)
-                && Objects.equals(startTime, that.startTime) && 
Objects.equals(endTime, that.endTime)
-                && Objects.equals(delayTime, that.delayTime) && 
Objects.equals(selfDepend,
-                        that.selfDepend)
+                && Objects.equals(startTime, that.startTime)
+                && Objects.equals(endTime, that.endTime)
+                && Objects.equals(delayTime, that.delayTime)
+                && Objects.equals(selfDepend, that.selfDepend)
                 && Objects.equals(taskParallelism, that.taskParallelism)
-                && Objects.equals(crontabExpression, that.crontabExpression) 
&& Objects.equals(version,
-                        that.version);
+                && Objects.equals(crontabExpression, that.crontabExpression);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
index a122235de3..7b7ddf19b0 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
@@ -26,7 +26,7 @@ public class NoopScheduleClient implements 
ScheduleEngineClient {
 
     @Override
     public boolean accept(String engineType) {
-        return ScheduleEngineType.NONE.getType().equals(engineType);
+        return ScheduleEngineType.NONE.getType().equalsIgnoreCase(engineType);
     }
 
     @Override
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 4906e91aa7..c6643c3259 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -218,4 +218,11 @@ public interface InlongGroupService {
      */
     List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag);
 
+    /**
+     * Submitting offline job for the given group.
+     * @param groupId the inlong group to submit offline job
+     *
+     * */
+    Boolean submitOfflineJob(String groupId);
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index d42a61dbfb..d2f3c0c948 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -931,4 +931,22 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         return groupInfoList;
     }
 
+    @Override
+    public Boolean submitOfflineJob(String groupId) {
+        // 1. get stream info list
+        InlongGroupInfo groupInfo = get(groupId);
+        if (groupInfo == null) {
+            String msg = String.format("InLong group not found for group=%s", 
groupId);
+            LOGGER.error(msg);
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+
+        List<InlongStreamInfo> streamInfoList = streamService.list(groupId);
+        if (CollectionUtils.isEmpty(streamInfoList)) {
+            LOGGER.warn("No stream info found for group {}, skip submit 
offline job", groupId);
+            return false;
+        }
+        return scheduleOperator.submitOfflineJob(groupId, streamInfoList);
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index 067a320a49..556a4c41fa 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -125,12 +125,6 @@ public class SortConfigListener implements 
SortOperateListener {
 
         try {
             for (InlongStreamInfo streamInfo : streamInfos) {
-                // do not build sort config if the group mode is offline
-                if 
(InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
-                    LOGGER.info("no need to build sort config for groupId={} 
streamId={} as the mode is offline",
-                            groupId, streamInfo.getInlongStreamId());
-                    continue;
-                }
                 List<StreamSink> sinkList = streamInfo.getSinkList();
                 if (CollectionUtils.isEmpty(sinkList)) {
                     continue;
@@ -143,8 +137,8 @@ public class SortConfigListener implements 
SortOperateListener {
                 }
             }
         } catch (Exception e) {
-            String msg = String.format("failed to build sort config for 
groupId=%s, ", groupId);
-            LOGGER.error(msg + "streamInfos=" + streamInfos, e);
+            String msg = String.format("Failed to build sort config for 
group=%s, ", groupId);
+            LOGGER.error("{} streamInfos={}", msg, streamInfos, e);
             throw new WorkflowListenerException(msg + e.getMessage());
         }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java
new file mode 100644
index 0000000000..001f303a7b
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OfflineJobOperatorFactory {
+
+    private static final Logger log = 
LoggerFactory.getLogger(OfflineJobOperatorFactory.class);
+    public static final String DEFAULT_OPERATOR_CLASS_NAME =
+            "org.apache.inlong.manager.plugin.offline.FlinkOfflineJobOperator";
+
+    public static OfflineJobOperator getOfflineJobOperator() {
+        return getOfflineJobOperator(DEFAULT_OPERATOR_CLASS_NAME);
+    }
+
+    public static OfflineJobOperator getOfflineJobOperator(String 
operatorClassName) {
+        return getOfflineJobOperator(operatorClassName, 
Thread.currentThread().getContextClassLoader());
+    }
+
+    public static OfflineJobOperator getOfflineJobOperator(String 
operatorClassName, ClassLoader classLoader) {
+        try {
+            Class<?> operatorClass = classLoader.loadClass(operatorClassName);
+            Object operator = 
operatorClass.getDeclaredConstructor().newInstance();
+            return (OfflineJobOperator) operator;
+        } catch (Throwable e) {
+            log.error("Failed to get offline job operator: ", e);
+            throw new BusinessException("Failed to get offline job operator: " 
+ e.getMessage());
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
index 6bf9c01432..653523294e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
@@ -19,6 +19,9 @@ package org.apache.inlong.manager.service.schedule;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import java.util.List;
 
 /**
  * Operator for schedule. Including:
@@ -92,4 +95,12 @@ public interface ScheduleOperator {
      * @Return whether succeed
      * */
     Boolean handleGroupApprove(String groupId);
+
+    /**
+     * Start offline sync job when the schedule instance callback.
+     * @param groupId groupId to start offline job
+     * @param streamInfoList stream list to start offline job
+     * @Return whether succeed
+     * */
+    Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList);
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
index 411a80766a..a20fa0cb10 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -18,14 +18,16 @@
 package org.apache.inlong.manager.service.schedule;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
-import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.schedule.ScheduleClientFactory;
 import org.apache.inlong.manager.schedule.ScheduleEngineClient;
+import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +35,9 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED;
 import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED;
 import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED;
@@ -48,12 +53,11 @@ public class ScheduleOperatorImpl implements 
ScheduleOperator {
     @Autowired
     private InlongGroupExtEntityMapper groupExtMapper;
 
-    @Autowired
-    private ScheduleEntityMapper scheduleMapper;
-
     @Autowired
     private ScheduleClientFactory scheduleClientFactory;
 
+    private OfflineJobOperator offlineJobOperator;
+
     private ScheduleEngineClient scheduleEngineClient;
 
     @Override
@@ -75,7 +79,7 @@ public class ScheduleOperatorImpl implements ScheduleOperator 
{
         String groupId = scheduleInfo.getInlongGroupId();
         InlongGroupExtEntity scheduleStatusExt =
                 groupExtMapper.selectByUniqueKey(groupId, 
InlongConstants.REGISTER_SCHEDULE_STATUS);
-        if 
(InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) {
+        if (scheduleStatusExt != null && 
InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) {
             // change schedule state to approved
             scheduleService.updateStatus(scheduleInfo.getInlongGroupId(), 
APPROVED, operator);
             registerToScheduleEngine(scheduleInfo, operator, false);
@@ -124,8 +128,10 @@ public class ScheduleOperatorImpl implements 
ScheduleOperator {
     @Override
     @Transactional(rollbackFor = Throwable.class)
     public Boolean updateAndRegister(ScheduleInfoRequest request, String 
operator) {
-        updateOpt(request, operator);
-        return 
registerToScheduleEngine(CommonBeanUtils.copyProperties(request, 
ScheduleInfo::new), operator, true);
+        if (updateOpt(request, operator)) {
+            return 
registerToScheduleEngine(CommonBeanUtils.copyProperties(request, 
ScheduleInfo::new), operator, true);
+        }
+        return false;
     }
 
     /**
@@ -172,4 +178,21 @@ public class ScheduleOperatorImpl implements 
ScheduleOperator {
         return registerToScheduleEngine(scheduleInfo, null, false);
     }
 
+    @Override
+    public Boolean submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList) {
+        if (offlineJobOperator == null) {
+            offlineJobOperator = 
OfflineJobOperatorFactory.getOfflineJobOperator();
+        }
+        try {
+            offlineJobOperator.submitOfflineJob(groupId, streamInfoList);
+            LOGGER.info("Submit offline job for group {} and stream list {} 
success.", groupId,
+                    
streamInfoList.stream().map(InlongStreamInfo::getName).collect(Collectors.toList()));
+        } catch (Exception e) {
+            String errorMsg = String.format("Submit offline job failed for 
groupId=%s", groupId);
+            LOGGER.error(errorMsg, e);
+            throw new BusinessException(errorMsg);
+        }
+        return true;
+    }
+
 }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 04d89d332c..3a9389c297 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -252,4 +252,10 @@ public class InlongGroupController {
         return Response.success(groupService.finishTagSwitch(groupId));
     }
 
+    @RequestMapping(value = "/group/submitOfflineJob/{groupId}", method = 
RequestMethod.POST)
+    @ApiOperation(value = "Submitting inlong offline job process")
+    @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class)
+    public Response<Boolean> submitOfflineJob(@PathVariable String groupId) {
+        return Response.success(groupService.submitOfflineJob(groupId));
+    }
 }
\ No newline at end of file
diff --git 
a/inlong-manager/manager-web/src/main/resources/application.properties 
b/inlong-manager/manager-web/src/main/resources/application.properties
index a6eec820ad..a498ac4d15 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -65,8 +65,4 @@ audit.admin.ids=3,4,5,6
 audit.user.ids=3,4,5,6
 
 # tencent cloud log service endpoint, The Operator cls resource by it
-cls.manager.endpoint=127.0.0.1
-
-# schedule engine type
-# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
\ No newline at end of file
+cls.manager.endpoint=127.0.0.1
\ No newline at end of file
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
similarity index 55%
copy from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
copy to 
inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
index a122235de3..f6d395dc7e 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
+++ 
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
@@ -15,32 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.workflow.processor;
 
-import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 
-import org.springframework.stereotype.Service;
+import java.util.List;
 
-@Service
-public class NoopScheduleClient implements ScheduleEngineClient {
+public interface OfflineJobOperator {
 
-    @Override
-    public boolean accept(String engineType) {
-        return ScheduleEngineType.NONE.getType().equals(engineType);
-    }
-
-    @Override
-    public boolean register(ScheduleInfo scheduleInfo) {
-        return true;
-    }
-
-    @Override
-    public boolean unregister(String groupId) {
-        return true;
-    }
-
-    @Override
-    public boolean update(ScheduleInfo scheduleInfo) {
-        return true;
-    }
+    void submitOfflineJob(String groupId, List<InlongStreamInfo> 
streamInfoList) throws Exception;
 }

Reply via email to