This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 50b770583ab29a60ad95a55c88aea0ac0e3156ce
Author: AloysZhang <[email protected]>
AuthorDate: Tue Jun 18 12:28:57 2024 +0800

    [INLONG-10436][Manager] Move schedule configuration from stream to group 
(#10437)
---
 .../plugin/listener/StartupSortListener.java       |  8 ++++
 .../manager/pojo/sort/util/StreamParseUtils.java   | 22 ----------
 .../{ => manager}/schedule/ScheduleEngine.java     |  2 +-
 .../schedule/ScheduleEngineClient.java             |  2 +-
 .../{ => manager}/schedule/ScheduleType.java       |  2 +-
 .../{ => manager}/schedule/ScheduleUnit.java       |  2 +-
 .../exception/QuartzScheduleException.java         |  2 +-
 .../schedule/quartz/QuartzOfflineSyncJob.java      |  2 +-
 .../schedule/quartz/QuartzScheduleClient.java      |  4 +-
 .../schedule/quartz/QuartzScheduleEngine.java      | 10 ++---
 .../schedule/quartz/QuartzSchedulerListener.java   |  2 +-
 .../{ => manager}/schedule/util/ScheduleUtils.java | 10 ++---
 .../{ => manager}/schedule/BaseScheduleTest.java   |  6 +--
 .../{ => manager}/schedule/quartz/MockJob.java     |  2 +-
 .../schedule/quartz/QuartzScheduleEngineTest.java  |  6 +--
 .../schedule/util/ScheduleUtilsTest.java           | 22 +++++-----
 .../service/listener/GroupTaskListenerFactory.java | 20 +++++++++
 .../listener/StreamTaskListenerFactory.java        | 29 -------------
 ...ner.java => GroupScheduleResourceListener.java} | 48 ++++++++++------------
 .../service/listener/sort/SortConfigListener.java  | 10 ++---
 .../manager/service/plugin/PluginClassLoader.java  |  2 +-
 .../group/CreateGroupWorkflowDefinition.java       | 11 ++++-
 .../stream/CreateStreamWorkflowDefinition.java     | 11 +----
 .../group/CreateGroupWorkflowDefinitionTest.java   |  3 +-
 24 files changed, 105 insertions(+), 133 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index ae2de1c86f..b0624fbe66 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.plugin.util.FlinkUtils;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -76,6 +77,13 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         GroupResourceProcessForm groupResourceForm = 
(GroupResourceProcessForm) processForm;
+        InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
+        // do not build sort config if the group mode is offline
+        if 
(InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
+            log.info("no need to launching sort job for groupId={} as the mode 
is offline",
+                    groupId);
+            return ListenerResult.success();
+        }
         List<InlongStreamInfo> streamInfos = 
groupResourceForm.getStreamInfos();
         int sinkCount = streamInfos.stream()
                 .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
index 203bcc6778..005529c299 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java
@@ -17,14 +17,11 @@
 
 package org.apache.inlong.manager.pojo.sort.util;
 
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.TransformType;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamNode;
 import org.apache.inlong.manager.pojo.stream.StreamPipeline;
 import org.apache.inlong.manager.pojo.stream.StreamTransform;
@@ -41,8 +38,6 @@ import 
org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition;
 
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 
 /**
  * Utils of stream parse.
@@ -158,21 +153,4 @@ public class StreamParseUtils {
                 String.format(" should not be null for streamId=%s", 
inlongStreamId));
         return GSON.fromJson(tempView, StreamPipeline.class);
     }
-
-    public static String getStreamExtProperty(String key, InlongStreamInfo 
streamInfo) {
-        if (StringUtils.isNotBlank(key) && streamInfo != null && 
CollectionUtils.isNotEmpty(streamInfo.getExtList())) {
-            for (InlongStreamExtInfo ext : streamInfo.getExtList()) {
-                if (key.equalsIgnoreCase(ext.getKeyName())) {
-                    return ext.getKeyValue();
-                }
-            }
-        }
-        return null;
-    }
-
-    public static boolean isRegisterScheduleSuccess(InlongStreamInfo 
streamInfo) {
-        return InlongConstants.REGISTERED
-                
.equalsIgnoreCase(getStreamExtProperty(InlongConstants.REGISTER_SCHEDULE_STATUS,
 streamInfo));
-    }
-
 }
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java
similarity index 97%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngine.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java
index bfdb9a88ad..1f52e280de 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngine.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule;
+package org.apache.inlong.manager.schedule;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngineClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
similarity index 97%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngineClient.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
index 1e5ce3460c..dee5cbb2db 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngineClient.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule;
+package org.apache.inlong.manager.schedule;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleType.java
similarity index 96%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleType.java
index 0f296fec42..6c64d46e4f 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleType.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule;
+package org.apache.inlong.manager.schedule;
 
 import lombok.Getter;
 
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleUnit.java
similarity index 96%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleUnit.java
index 7b4d60779f..f4f0a766d8 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleUnit.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule;
+package org.apache.inlong.manager.schedule;
 
 import lombok.Getter;
 
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/QuartzScheduleException.java
similarity index 95%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/QuartzScheduleException.java
index 2d0ff005bd..94d466acdf 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/QuartzScheduleException.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.exception;
+package org.apache.inlong.manager.schedule.exception;
 
 /**
  * Exceptions occur in the schedule procedure.
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
similarity index 96%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
index 27628b3320..8ac9d3b6fb 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.quartz;
+package org.apache.inlong.manager.schedule.quartz;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
 
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java
similarity index 93%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java
index 41fc8814a1..05aa6c01ae 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.quartz;
+package org.apache.inlong.manager.schedule.quartz;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
-import org.apache.inlong.schedule.ScheduleEngineClient;
+import org.apache.inlong.manager.schedule.ScheduleEngineClient;
 
 /**
  * Built-in implementation of schedule engine client corresponding with {@link 
QuartzScheduleEngine}.
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
similarity index 93%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
index 31736f1887..d9d2620211 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.quartz;
+package org.apache.inlong.manager.schedule.quartz;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
-import org.apache.inlong.schedule.ScheduleEngine;
-import org.apache.inlong.schedule.exception.QuartzScheduleException;
+import org.apache.inlong.manager.schedule.ScheduleEngine;
+import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
 
 import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
@@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory;
 import java.util.HashSet;
 import java.util.Set;
 
-import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzJobDetail;
-import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzTrigger;
+import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.genQuartzJobDetail;
+import static 
org.apache.inlong.manager.schedule.util.ScheduleUtils.genQuartzTrigger;
 
 /**
  * The default implementation of schedule engine based on Quartz scheduler. 
Response for processing
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
similarity index 98%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
index ca4f9f1b03..49a9583eaf 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.quartz;
+package org.apache.inlong.manager.schedule.quartz;
 
 import org.quartz.JobDetail;
 import org.quartz.JobKey;
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
similarity index 94%
rename from 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
rename to 
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
index 73114fcb6c..65475a0b98 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.util;
+package org.apache.inlong.manager.schedule.util;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
-import org.apache.inlong.schedule.ScheduleType;
-import org.apache.inlong.schedule.ScheduleUnit;
-import org.apache.inlong.schedule.exception.QuartzScheduleException;
-import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob;
+import org.apache.inlong.manager.schedule.ScheduleType;
+import org.apache.inlong.manager.schedule.ScheduleUnit;
+import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
+import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob;
 
 import org.apache.commons.lang3.StringUtils;
 import org.quartz.CronScheduleBuilder;
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
similarity index 96%
rename from 
inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
rename to 
inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
index c0ccc4b14c..a6eb129d5a 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule;
+package org.apache.inlong.manager.schedule;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
-import org.apache.inlong.schedule.exception.QuartzScheduleException;
+import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
 
 import java.sql.Timestamp;
 
-import static org.apache.inlong.schedule.ScheduleUnit.SECOND;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND;
 
 public class BaseScheduleTest {
 
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java
similarity index 97%
rename from 
inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
rename to 
inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java
index 47ce19ad09..9202ea5b40 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.quartz;
+package org.apache.inlong.manager.schedule.quartz;
 
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java
similarity index 97%
rename from 
inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
rename to 
inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java
index fc5ca9d9fd..008a7e42f8 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java
@@ -15,10 +15,10 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.quartz;
+package org.apache.inlong.manager.schedule.quartz;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
-import org.apache.inlong.schedule.BaseScheduleTest;
+import org.apache.inlong.manager.schedule.BaseScheduleTest;
 
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -27,7 +27,7 @@ import org.quartz.JobKey;
 
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.inlong.schedule.ScheduleUnit.SECOND;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND;
 import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
diff --git 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
similarity index 89%
rename from 
inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
rename to 
inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
index 77b84f3064..415331be3d 100644
--- 
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java
+++ 
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.schedule.util;
+package org.apache.inlong.manager.schedule.util;
 
 import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
-import org.apache.inlong.schedule.BaseScheduleTest;
-import org.apache.inlong.schedule.exception.QuartzScheduleException;
-import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob;
+import org.apache.inlong.manager.schedule.BaseScheduleTest;
+import org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
+import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob;
 
 import org.junit.jupiter.api.Test;
 import org.quartz.CronScheduleBuilder;
@@ -35,13 +35,13 @@ import org.quartz.TriggerKey;
 
 import java.util.Date;
 
-import static org.apache.inlong.schedule.ScheduleUnit.DAY;
-import static org.apache.inlong.schedule.ScheduleUnit.HOUR;
-import static org.apache.inlong.schedule.ScheduleUnit.MINUTE;
-import static org.apache.inlong.schedule.ScheduleUnit.MONTH;
-import static org.apache.inlong.schedule.ScheduleUnit.ONE_WAY;
-import static org.apache.inlong.schedule.ScheduleUnit.WEEK;
-import static org.apache.inlong.schedule.ScheduleUnit.YEAR;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.DAY;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.HOUR;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.MINUTE;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.MONTH;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_WAY;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.WEEK;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.YEAR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
index a690472304..3ff9942c5d 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.listener;
 import org.apache.inlong.manager.common.plugin.Plugin;
 import org.apache.inlong.manager.common.plugin.PluginBinder;
 import org.apache.inlong.manager.service.listener.queue.QueueResourceListener;
+import 
org.apache.inlong.manager.service.listener.schedule.GroupScheduleResourceListener;
 import org.apache.inlong.manager.service.listener.sink.SinkResourceListener;
 import org.apache.inlong.manager.service.listener.sort.SortConfigListener;
 import org.apache.inlong.manager.service.listener.source.SourceDeleteListener;
@@ -29,6 +30,7 @@ import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
 import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
+import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
@@ -57,6 +59,7 @@ public class GroupTaskListenerFactory implements 
PluginBinder, TaskListenerFacto
     private List<SourceOperateListener> sourceOperateListeners;
     private List<QueueOperateListener> queueOperateListeners;
     private List<SortOperateListener> sortOperateListeners;
+    private List<ScheduleOperateListener> scheduleOperateListeners;
 
     @Autowired
     private SourceStopListener sourceStopListener;
@@ -70,6 +73,8 @@ public class GroupTaskListenerFactory implements 
PluginBinder, TaskListenerFacto
     private SinkResourceListener sinkResourceListener;
     @Autowired
     private SortConfigListener sortConfigListener;
+    @Autowired
+    private GroupScheduleResourceListener groupScheduleResourceListener;
 
     @PostConstruct
     public void init() {
@@ -81,6 +86,8 @@ public class GroupTaskListenerFactory implements 
PluginBinder, TaskListenerFacto
         queueOperateListeners.add(queueResourceListener);
         sortOperateListeners = new LinkedList<>();
         sortOperateListeners.add(sortConfigListener);
+        scheduleOperateListeners = new LinkedList<>();
+        scheduleOperateListeners.add(groupScheduleResourceListener);
     }
 
     @Override
@@ -124,6 +131,9 @@ public class GroupTaskListenerFactory implements 
PluginBinder, TaskListenerFacto
                 return Lists.newArrayList(sourceOperateListeners);
             case INIT_SINK:
                 return Collections.singletonList(sinkResourceListener);
+            case INIT_SCHEDULE:
+                List<ScheduleOperateListener> scheduleOperateListeners = 
getScheduleOperateListener(workflowContext);
+                return Lists.newArrayList(scheduleOperateListeners);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
ServiceTaskType %s", taskType));
         }
@@ -177,4 +187,14 @@ public class GroupTaskListenerFactory implements 
PluginBinder, TaskListenerFacto
         return listeners;
     }
 
+    public List<ScheduleOperateListener> 
getScheduleOperateListener(WorkflowContext context) {
+        List<ScheduleOperateListener> listeners = new ArrayList<>();
+        for (ScheduleOperateListener listener : scheduleOperateListeners) {
+            if (listener != null && listener.accept(context)) {
+                listeners.add(listener);
+            }
+        }
+        return listeners;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
index 87c3a867c2..5069423c13 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java
@@ -20,14 +20,12 @@ package org.apache.inlong.manager.service.listener;
 import org.apache.inlong.manager.common.plugin.Plugin;
 import org.apache.inlong.manager.common.plugin.PluginBinder;
 import 
org.apache.inlong.manager.service.listener.queue.StreamQueueResourceListener;
-import 
org.apache.inlong.manager.service.listener.schedule.StreamScheduleResourceListener;
 import 
org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener;
 import 
org.apache.inlong.manager.service.listener.sort.StreamSortConfigListener;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
 import org.apache.inlong.manager.workflow.definition.TaskListenerFactory;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
-import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SinkOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SortOperateListener;
 import org.apache.inlong.manager.workflow.event.task.SourceOperateListener;
@@ -55,7 +53,6 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
     private List<QueueOperateListener> queueOperateListeners;
     private List<SortOperateListener> sortOperateListeners;
     private List<SinkOperateListener> sinkOperateListeners;
-    private List<ScheduleOperateListener> scheduleOperateListeners;
 
     @Autowired
     private StreamQueueResourceListener queueResourceListener;
@@ -63,8 +60,6 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
     private StreamSortConfigListener streamSortConfigListener;
     @Autowired
     private StreamSinkResourceListener sinkResourceListener;
-    @Autowired
-    private StreamScheduleResourceListener scheduleResourceListener;
 
     @PostConstruct
     public void init() {
@@ -75,8 +70,6 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
         sortOperateListeners.add(streamSortConfigListener);
         sinkOperateListeners = new LinkedList<>();
         sinkOperateListeners.add(sinkResourceListener);
-        scheduleOperateListeners = new LinkedList<>();
-        scheduleOperateListeners.add(scheduleResourceListener);
     }
 
     @Override
@@ -101,10 +94,6 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
         if (CollectionUtils.isNotEmpty(pluginSinkOperateListeners)) {
             sinkOperateListeners.addAll(pluginSinkOperateListeners);
         }
-        List<ScheduleOperateListener> pluginScheduleOperateListeners = 
processPlugin.createScheduleOperateListeners();
-        if (CollectionUtils.isNotEmpty(pluginScheduleOperateListeners)) {
-            scheduleOperateListeners.addAll(pluginScheduleOperateListeners);
-        }
     }
 
     @Override
@@ -129,9 +118,6 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
             case INIT_SINK:
                 List<SinkOperateListener> sinkOperateListeners = 
getSinkOperateListener(workflowContext);
                 return Lists.newArrayList(sinkOperateListeners);
-            case INIT_SCHEDULE:
-                List<ScheduleOperateListener> scheduleOperateListeners = 
getScheduleOperateListener(workflowContext);
-                return Lists.newArrayList(scheduleOperateListeners);
             default:
                 throw new IllegalArgumentException(String.format("Unsupported 
ServiceTaskType %s", taskType));
         }
@@ -145,7 +131,6 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
         queueOperateListeners = new LinkedList<>();
         sortOperateListeners = new LinkedList<>();
         sinkOperateListeners = new LinkedList<>();
-        scheduleOperateListeners = new LinkedList<>();
     }
 
     /**
@@ -199,18 +184,4 @@ public class StreamTaskListenerFactory implements 
PluginBinder, TaskListenerFact
         }
         return listeners;
     }
-
-    /**
-     * Get schedule operate listener list.
-     */
-    private List<ScheduleOperateListener> 
getScheduleOperateListener(WorkflowContext context) {
-        List<ScheduleOperateListener> listeners = new ArrayList<>();
-        for (ScheduleOperateListener listener : scheduleOperateListeners) {
-            if (listener != null && listener.accept(context)) {
-                listeners.add(listener);
-            }
-        }
-        return listeners;
-    }
-
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
similarity index 54%
rename from 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java
rename to 
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
index 61179dc343..e27dab9e89 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java
@@ -20,10 +20,10 @@ package org.apache.inlong.manager.service.listener.schedule;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
-import 
org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener;
@@ -35,7 +35,7 @@ import java.util.List;
 
 @Service
 @Slf4j
-public class StreamScheduleResourceListener implements ScheduleOperateListener 
{
+public class GroupScheduleResourceListener implements ScheduleOperateListener {
 
     @Override
     public TaskEvent event() {
@@ -46,48 +46,44 @@ public class StreamScheduleResourceListener implements 
ScheduleOperateListener {
     public boolean accept(WorkflowContext context) {
         ProcessForm processForm = context.getProcessForm();
         String groupId = processForm.getInlongGroupId();
-        if (!(processForm instanceof StreamResourceProcessForm)) {
-            log.info("not add schedule stream listener, not 
StreamResourceProcessForm for groupId [{}]", groupId);
+        if (!(processForm instanceof GroupResourceProcessForm)) {
+            log.info("not add schedule group listener, not 
groupResourceProcessForm for groupId [{}]", groupId);
             return false;
         }
 
-        StreamResourceProcessForm streamProcessForm = 
(StreamResourceProcessForm) processForm;
-        String streamId = 
streamProcessForm.getStreamInfo().getInlongStreamId();
-        if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
-            log.info("not add schedule stream listener, as the operate was not 
INIT for groupId [{}] streamId [{}]",
-                    groupId, streamId);
+        GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) 
processForm;
+        if (groupProcessForm.getGroupOperateType() != GroupOperateType.INIT) {
+            log.info("not add schedule group listener, as the operate was not 
INIT for groupId [{}]", groupId);
             return false;
         }
 
-        log.info("add startup stream listener for groupId [{}] streamId [{}]", 
groupId, streamId);
-        return 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode());
+        log.info("add startup group listener for groupId [{}]", groupId);
+        return 
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
     }
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        StreamResourceProcessForm form = (StreamResourceProcessForm) 
context.getProcessForm();
-        InlongStreamInfo streamInfo = form.getStreamInfo();
-        final String groupId = streamInfo.getInlongGroupId();
-        final String streamId = streamInfo.getInlongStreamId();
-        log.info("begin to register schedule info for groupId={}, 
streamId={}", groupId, streamId);
+        GroupResourceProcessForm form = (GroupResourceProcessForm) 
context.getProcessForm();
+        InlongGroupInfo groupInfo = form.getGroupInfo();
+        final String groupId = groupInfo.getInlongGroupId();
+        log.info("begin to register schedule info for groupId={}", groupId);
 
         // todo: register schedule info to schedule service
 
-        // after register schedule info successfully, add ext property to 
stream info
-        saveInfo(streamInfo, InlongConstants.REGISTER_SCHEDULE_STATUS,
-                InlongConstants.REGISTERED, streamInfo.getExtList());
-        log.info("success to register schedule info for group [" + groupId + 
"] and stream [" + streamId + "]");
+        // after register schedule info successfully, add ext property to 
group ext info
+        saveInfo(groupInfo, InlongConstants.REGISTER_SCHEDULE_STATUS,
+                InlongConstants.REGISTERED, groupInfo.getExtList());
+        log.info("success to register schedule info for group={}", groupId);
         return ListenerResult.success();
     }
 
     /**
      * Save stream ext info into list.
      */
-    private void saveInfo(InlongStreamInfo streamInfo, String keyName, String 
keyValue,
-            List<InlongStreamExtInfo> extInfoList) {
-        InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
+    private void saveInfo(InlongGroupInfo streamInfo, String keyName, String 
keyValue,
+            List<InlongGroupExtInfo> extInfoList) {
+        InlongGroupExtInfo extInfo = new InlongGroupExtInfo();
         extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
-        extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
         extInfo.setKeyName(keyName);
         extInfo.setKeyValue(keyValue);
         extInfoList.add(extInfo);
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index f8981bf09a..067a320a49 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -24,7 +24,6 @@ import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
-import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
@@ -126,11 +125,10 @@ public class SortConfigListener implements 
SortOperateListener {
 
         try {
             for (InlongStreamInfo streamInfo : streamInfos) {
-                // do not build sort config if the group mode is offline and 
the stream is not config successfully
-                if 
(InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())
-                        && 
!StreamParseUtils.isRegisterScheduleSuccess(streamInfo)) {
-                    LOGGER.info("no need to build sort config for groupId={} 
streamId={} as the mode is offline "
-                            + "and the stream is not config successfully yet", 
groupId, streamInfo.getInlongStreamId());
+                // do not build sort config if the group mode is offline
+                if 
(InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
+                    LOGGER.info("no need to build sort config for groupId={} 
streamId={} as the mode is offline",
+                            groupId, streamInfo.getInlongStreamId());
                     continue;
                 }
                 List<StreamSink> sinkList = streamInfo.getSinkList();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
index 08d04b58ee..1b30839308 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
@@ -88,7 +88,7 @@ public class PluginClassLoader extends URLClassLoader {
      * Get pluginClassLoader by plugin url.
      */
     public static PluginClassLoader getFromPluginUrl(String url, ClassLoader 
parent) {
-        log.info("ClassLoaderPath:{}", url);
+        log.info("ClassLoaderPath : {}", url);
         checkClassLoader(parent);
         checkUrl(url);
         return AccessController.doPrivileged(new 
PrivilegedAction<PluginClassLoader>() {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index bc8f0c2546..b9cacdffc4 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -86,6 +86,14 @@ public class CreateGroupWorkflowDefinition implements 
WorkflowDefinition {
         initSortTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(initSortTask);
 
+        // Init Schedule
+        ServiceTask initScheduleTask = new ServiceTask();
+        initScheduleTask.setName("InitSchedule");
+        initScheduleTask.setDisplayName("Group-InitSchedule");
+        initScheduleTask.setServiceTaskType(ServiceTaskType.INIT_SCHEDULE);
+        initScheduleTask.setListenerFactory(groupTaskListenerFactory);
+        process.addTask(initScheduleTask);
+
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
@@ -94,7 +102,8 @@ public class CreateGroupWorkflowDefinition implements 
WorkflowDefinition {
         // To ensure that after some tasks fail, data will not start to be 
collected by source or consumed by sort
         startEvent.addNext(initMQTask);
         initMQTask.addNext(initSortTask);
-        initSortTask.addNext(endEvent);
+        initSortTask.addNext(initScheduleTask);
+        initScheduleTask.addNext(endEvent);
 
         return process;
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 5505ed0694..2ec6e9052e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -102,14 +102,6 @@ public class CreateStreamWorkflowDefinition implements 
WorkflowDefinition {
         initSourceTask.setListenerFactory(streamTaskListenerFactory);
         process.addTask(initSourceTask);
 
-        // Init Schedule info
-        ServiceTask initScheduleTask = new ServiceTask();
-        initScheduleTask.setName("InitSchedule");
-        initScheduleTask.setDisplayName("Stream-InitSchedule");
-        initScheduleTask.setServiceTaskType(ServiceTaskType.INIT_SCHEDULE);
-        initScheduleTask.setListenerFactory(streamTaskListenerFactory);
-        process.addTask(initScheduleTask);
-
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
@@ -120,8 +112,7 @@ public class CreateStreamWorkflowDefinition implements 
WorkflowDefinition {
         initMQTask.addNext(initSinkTask);
         initSinkTask.addNext(initSortTask);
         initSortTask.addNext(initSourceTask);
-        initSourceTask.addNext(initScheduleTask);
-        initScheduleTask.addNext(endEvent);
+        initSourceTask.addNext(endEvent);
 
         return process;
     }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
index abae87fef4..e56b013d57 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
@@ -40,7 +40,8 @@ public class CreateGroupWorkflowDefinitionTest extends 
ServiceBaseTest {
         Assertions.assertNotSame(cloneProcess2, cloneProcess1);
         Assertions.assertNotNull(process.getTaskByName("InitMQ"));
         Assertions.assertNotNull(process.getTaskByName("InitSort"));
-        Assertions.assertEquals(2, process.getNameToTaskMap().size());
+        Assertions.assertNotNull(process.getTaskByName("InitSchedule"));
+        Assertions.assertEquals(3, process.getNameToTaskMap().size());
     }
 
 }


Reply via email to