This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 50b770583ab29a60ad95a55c88aea0ac0e3156ce Author: AloysZhang <[email protected]> AuthorDate: Tue Jun 18 12:28:57 2024 +0800 [INLONG-10436][Manager] Move schedule configuration from stream to group (#10437) --- .../plugin/listener/StartupSortListener.java | 8 ++++ .../manager/pojo/sort/util/StreamParseUtils.java | 22 ---------- .../{ => manager}/schedule/ScheduleEngine.java | 2 +- .../schedule/ScheduleEngineClient.java | 2 +- .../{ => manager}/schedule/ScheduleType.java | 2 +- .../{ => manager}/schedule/ScheduleUnit.java | 2 +- .../exception/QuartzScheduleException.java | 2 +- .../schedule/quartz/QuartzOfflineSyncJob.java | 2 +- .../schedule/quartz/QuartzScheduleClient.java | 4 +- .../schedule/quartz/QuartzScheduleEngine.java | 10 ++--- .../schedule/quartz/QuartzSchedulerListener.java | 2 +- .../{ => manager}/schedule/util/ScheduleUtils.java | 10 ++--- .../{ => manager}/schedule/BaseScheduleTest.java | 6 +-- .../{ => manager}/schedule/quartz/MockJob.java | 2 +- .../schedule/quartz/QuartzScheduleEngineTest.java | 6 +-- .../schedule/util/ScheduleUtilsTest.java | 22 +++++----- .../service/listener/GroupTaskListenerFactory.java | 20 +++++++++ .../listener/StreamTaskListenerFactory.java | 29 ------------- ...ner.java => GroupScheduleResourceListener.java} | 48 ++++++++++------------ .../service/listener/sort/SortConfigListener.java | 10 ++--- .../manager/service/plugin/PluginClassLoader.java | 2 +- .../group/CreateGroupWorkflowDefinition.java | 11 ++++- .../stream/CreateStreamWorkflowDefinition.java | 11 +---- .../group/CreateGroupWorkflowDefinitionTest.java | 3 +- 24 files changed, 105 insertions(+), 133 deletions(-) diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index ae2de1c86f..b0624fbe66 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.plugin.util.FlinkUtils; +import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; @@ -76,6 +77,13 @@ public class StartupSortListener implements SortOperateListener { } GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm; + InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo(); + // do not build sort config if the group mode is offline + if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { + log.info("no need to launching sort job for groupId={} as the mode is offline", + groupId); + return ListenerResult.success(); + } List<InlongStreamInfo> streamInfos = groupResourceForm.getStreamInfos(); int sinkCount = streamInfos.stream() .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size()) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java index 203bcc6778..005529c299 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/StreamParseUtils.java @@ -17,14 +17,11 @@ package org.apache.inlong.manager.pojo.sort.util; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.enums.TransformType; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.source.StreamSource; -import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; -import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamNode; import org.apache.inlong.manager.pojo.stream.StreamPipeline; import org.apache.inlong.manager.pojo.stream.StreamTransform; @@ -41,8 +38,6 @@ import org.apache.inlong.manager.pojo.transform.splitter.SplitterDefinition; import com.google.gson.Gson; import com.google.gson.JsonObject; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.StringUtils; /** * Utils of stream parse. @@ -158,21 +153,4 @@ public class StreamParseUtils { String.format(" should not be null for streamId=%s", inlongStreamId)); return GSON.fromJson(tempView, StreamPipeline.class); } - - public static String getStreamExtProperty(String key, InlongStreamInfo streamInfo) { - if (StringUtils.isNotBlank(key) && streamInfo != null && CollectionUtils.isNotEmpty(streamInfo.getExtList())) { - for (InlongStreamExtInfo ext : streamInfo.getExtList()) { - if (key.equalsIgnoreCase(ext.getKeyName())) { - return ext.getKeyValue(); - } - } - } - return null; - } - - public static boolean isRegisterScheduleSuccess(InlongStreamInfo streamInfo) { - return InlongConstants.REGISTERED - .equalsIgnoreCase(getStreamExtProperty(InlongConstants.REGISTER_SCHEDULE_STATUS, streamInfo)); - } - } diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java similarity index 97% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngine.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java index bfdb9a88ad..1f52e280de 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngine.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.schedule; +package org.apache.inlong.manager.schedule; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngineClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java similarity index 97% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngineClient.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java index 1e5ce3460c..dee5cbb2db 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleEngineClient.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleEngineClient.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.schedule; +package org.apache.inlong.manager.schedule; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleType.java similarity index 96% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleType.java index 0f296fec42..6c64d46e4f 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleType.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleType.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.schedule; +package org.apache.inlong.manager.schedule; import lombok.Getter; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleUnit.java similarity index 96% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleUnit.java index 7b4d60779f..f4f0a766d8 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/ScheduleUnit.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/ScheduleUnit.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.schedule; +package org.apache.inlong.manager.schedule; import lombok.Getter; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/QuartzScheduleException.java similarity index 95% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/QuartzScheduleException.java index 2d0ff005bd..94d466acdf 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/exception/QuartzScheduleException.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/exception/QuartzScheduleException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.schedule.exception; +package org.apache.inlong.manager.schedule.exception; /** * Exceptions occur in the schedule procedure. diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java similarity index 96% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java index 27628b3320..8ac9d3b6fb 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzOfflineSyncJob.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.schedule.quartz; +package org.apache.inlong.manager.schedule.quartz; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java similarity index 93% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java index 41fc8814a1..05aa6c01ae 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleClient.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleClient.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.inlong.schedule.quartz; +package org.apache.inlong.manager.schedule.quartz; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; -import org.apache.inlong.schedule.ScheduleEngineClient; +import org.apache.inlong.manager.schedule.ScheduleEngineClient; /** * Built-in implementation of schedule engine client corresponding with {@link QuartzScheduleEngine}. diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java similarity index 93% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java index 31736f1887..d9d2620211 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngine.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package org.apache.inlong.schedule.quartz; +package org.apache.inlong.manager.schedule.quartz; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; -import org.apache.inlong.schedule.ScheduleEngine; -import org.apache.inlong.schedule.exception.QuartzScheduleException; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.QuartzScheduleException; import com.google.common.annotations.VisibleForTesting; import lombok.Getter; @@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.Set; -import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzJobDetail; -import static org.apache.inlong.schedule.util.ScheduleUtils.genQuartzTrigger; +import static org.apache.inlong.manager.schedule.util.ScheduleUtils.genQuartzJobDetail; +import static org.apache.inlong.manager.schedule.util.ScheduleUtils.genQuartzTrigger; /** * The default implementation of schedule engine based on Quartz scheduler. Response for processing diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java similarity index 98% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java index ca4f9f1b03..49a9583eaf 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/quartz/QuartzSchedulerListener.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzSchedulerListener.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.schedule.quartz; +package org.apache.inlong.manager.schedule.quartz; import org.quartz.JobDetail; import org.quartz.JobKey; diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java similarity index 94% rename from inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java rename to inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java index 73114fcb6c..65475a0b98 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/schedule/util/ScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.schedule.util; +package org.apache.inlong.manager.schedule.util; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; -import org.apache.inlong.schedule.ScheduleType; -import org.apache.inlong.schedule.ScheduleUnit; -import org.apache.inlong.schedule.exception.QuartzScheduleException; -import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob; +import org.apache.inlong.manager.schedule.ScheduleType; +import org.apache.inlong.manager.schedule.ScheduleUnit; +import org.apache.inlong.manager.schedule.exception.QuartzScheduleException; +import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob; import org.apache.commons.lang3.StringUtils; import org.quartz.CronScheduleBuilder; diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java similarity index 96% rename from inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java rename to inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java index c0ccc4b14c..a6eb129d5a 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/BaseScheduleTest.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.inlong.schedule; +package org.apache.inlong.manager.schedule; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; -import org.apache.inlong.schedule.exception.QuartzScheduleException; +import org.apache.inlong.manager.schedule.exception.QuartzScheduleException; import java.sql.Timestamp; -import static org.apache.inlong.schedule.ScheduleUnit.SECOND; +import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND; public class BaseScheduleTest { diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java similarity index 97% rename from inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java rename to inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java index 47ce19ad09..9202ea5b40 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/MockJob.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/MockJob.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.schedule.quartz; +package org.apache.inlong.manager.schedule.quartz; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java similarity index 97% rename from inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java rename to inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java index fc5ca9d9fd..008a7e42f8 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/quartz/QuartzScheduleEngineTest.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/quartz/QuartzScheduleEngineTest.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package org.apache.inlong.schedule.quartz; +package org.apache.inlong.manager.schedule.quartz; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; -import org.apache.inlong.schedule.BaseScheduleTest; +import org.apache.inlong.manager.schedule.BaseScheduleTest; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -27,7 +27,7 @@ import org.quartz.JobKey; import java.util.concurrent.TimeUnit; -import static org.apache.inlong.schedule.ScheduleUnit.SECOND; +import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java similarity index 89% rename from inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java rename to inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java index 77b84f3064..415331be3d 100644 --- a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/schedule/util/ScheduleUtilsTest.java +++ b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/util/ScheduleUtilsTest.java @@ -15,12 +15,12 @@ * limitations under the License. */ -package org.apache.inlong.schedule.util; +package org.apache.inlong.manager.schedule.util; import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; -import org.apache.inlong.schedule.BaseScheduleTest; -import org.apache.inlong.schedule.exception.QuartzScheduleException; -import org.apache.inlong.schedule.quartz.QuartzOfflineSyncJob; +import org.apache.inlong.manager.schedule.BaseScheduleTest; +import org.apache.inlong.manager.schedule.exception.QuartzScheduleException; +import org.apache.inlong.manager.schedule.quartz.QuartzOfflineSyncJob; import org.junit.jupiter.api.Test; import org.quartz.CronScheduleBuilder; @@ -35,13 +35,13 @@ import org.quartz.TriggerKey; import java.util.Date; -import static org.apache.inlong.schedule.ScheduleUnit.DAY; -import static org.apache.inlong.schedule.ScheduleUnit.HOUR; -import static org.apache.inlong.schedule.ScheduleUnit.MINUTE; -import static org.apache.inlong.schedule.ScheduleUnit.MONTH; -import static org.apache.inlong.schedule.ScheduleUnit.ONE_WAY; -import static org.apache.inlong.schedule.ScheduleUnit.WEEK; -import static org.apache.inlong.schedule.ScheduleUnit.YEAR; +import static org.apache.inlong.manager.schedule.ScheduleUnit.DAY; +import static org.apache.inlong.manager.schedule.ScheduleUnit.HOUR; +import static org.apache.inlong.manager.schedule.ScheduleUnit.MINUTE; +import static org.apache.inlong.manager.schedule.ScheduleUnit.MONTH; +import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_WAY; +import static org.apache.inlong.manager.schedule.ScheduleUnit.WEEK; +import static org.apache.inlong.manager.schedule.ScheduleUnit.YEAR; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java index a690472304..3ff9942c5d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/GroupTaskListenerFactory.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.listener; import org.apache.inlong.manager.common.plugin.Plugin; import org.apache.inlong.manager.common.plugin.PluginBinder; import org.apache.inlong.manager.service.listener.queue.QueueResourceListener; +import org.apache.inlong.manager.service.listener.schedule.GroupScheduleResourceListener; import org.apache.inlong.manager.service.listener.sink.SinkResourceListener; import org.apache.inlong.manager.service.listener.sort.SortConfigListener; import org.apache.inlong.manager.service.listener.source.SourceDeleteListener; @@ -29,6 +30,7 @@ import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.definition.ServiceTaskType; import org.apache.inlong.manager.workflow.definition.TaskListenerFactory; import org.apache.inlong.manager.workflow.event.task.QueueOperateListener; +import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener; import org.apache.inlong.manager.workflow.event.task.SortOperateListener; import org.apache.inlong.manager.workflow.event.task.SourceOperateListener; import org.apache.inlong.manager.workflow.event.task.TaskEventListener; @@ -57,6 +59,7 @@ public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFacto private List<SourceOperateListener> sourceOperateListeners; private List<QueueOperateListener> queueOperateListeners; private List<SortOperateListener> sortOperateListeners; + private List<ScheduleOperateListener> scheduleOperateListeners; @Autowired private SourceStopListener sourceStopListener; @@ -70,6 +73,8 @@ public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFacto private SinkResourceListener sinkResourceListener; @Autowired private SortConfigListener sortConfigListener; + @Autowired + private GroupScheduleResourceListener groupScheduleResourceListener; @PostConstruct public void init() { @@ -81,6 +86,8 @@ public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFacto queueOperateListeners.add(queueResourceListener); sortOperateListeners = new LinkedList<>(); sortOperateListeners.add(sortConfigListener); + scheduleOperateListeners = new LinkedList<>(); + scheduleOperateListeners.add(groupScheduleResourceListener); } @Override @@ -124,6 +131,9 @@ public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFacto return Lists.newArrayList(sourceOperateListeners); case INIT_SINK: return Collections.singletonList(sinkResourceListener); + case INIT_SCHEDULE: + List<ScheduleOperateListener> scheduleOperateListeners = getScheduleOperateListener(workflowContext); + return Lists.newArrayList(scheduleOperateListeners); default: throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType)); } @@ -177,4 +187,14 @@ public class GroupTaskListenerFactory implements PluginBinder, TaskListenerFacto return listeners; } + public List<ScheduleOperateListener> getScheduleOperateListener(WorkflowContext context) { + List<ScheduleOperateListener> listeners = new ArrayList<>(); + for (ScheduleOperateListener listener : scheduleOperateListeners) { + if (listener != null && listener.accept(context)) { + listeners.add(listener); + } + } + return listeners; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java index 87c3a867c2..5069423c13 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/StreamTaskListenerFactory.java @@ -20,14 +20,12 @@ package org.apache.inlong.manager.service.listener; import org.apache.inlong.manager.common.plugin.Plugin; import org.apache.inlong.manager.common.plugin.PluginBinder; import org.apache.inlong.manager.service.listener.queue.StreamQueueResourceListener; -import org.apache.inlong.manager.service.listener.schedule.StreamScheduleResourceListener; import org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener; import org.apache.inlong.manager.service.listener.sort.StreamSortConfigListener; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.definition.ServiceTaskType; import org.apache.inlong.manager.workflow.definition.TaskListenerFactory; import org.apache.inlong.manager.workflow.event.task.QueueOperateListener; -import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener; import org.apache.inlong.manager.workflow.event.task.SinkOperateListener; import org.apache.inlong.manager.workflow.event.task.SortOperateListener; import org.apache.inlong.manager.workflow.event.task.SourceOperateListener; @@ -55,7 +53,6 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact private List<QueueOperateListener> queueOperateListeners; private List<SortOperateListener> sortOperateListeners; private List<SinkOperateListener> sinkOperateListeners; - private List<ScheduleOperateListener> scheduleOperateListeners; @Autowired private StreamQueueResourceListener queueResourceListener; @@ -63,8 +60,6 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact private StreamSortConfigListener streamSortConfigListener; @Autowired private StreamSinkResourceListener sinkResourceListener; - @Autowired - private StreamScheduleResourceListener scheduleResourceListener; @PostConstruct public void init() { @@ -75,8 +70,6 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact sortOperateListeners.add(streamSortConfigListener); sinkOperateListeners = new LinkedList<>(); sinkOperateListeners.add(sinkResourceListener); - scheduleOperateListeners = new LinkedList<>(); - scheduleOperateListeners.add(scheduleResourceListener); } @Override @@ -101,10 +94,6 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact if (CollectionUtils.isNotEmpty(pluginSinkOperateListeners)) { sinkOperateListeners.addAll(pluginSinkOperateListeners); } - List<ScheduleOperateListener> pluginScheduleOperateListeners = processPlugin.createScheduleOperateListeners(); - if (CollectionUtils.isNotEmpty(pluginScheduleOperateListeners)) { - scheduleOperateListeners.addAll(pluginScheduleOperateListeners); - } } @Override @@ -129,9 +118,6 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact case INIT_SINK: List<SinkOperateListener> sinkOperateListeners = getSinkOperateListener(workflowContext); return Lists.newArrayList(sinkOperateListeners); - case INIT_SCHEDULE: - List<ScheduleOperateListener> scheduleOperateListeners = getScheduleOperateListener(workflowContext); - return Lists.newArrayList(scheduleOperateListeners); default: throw new IllegalArgumentException(String.format("Unsupported ServiceTaskType %s", taskType)); } @@ -145,7 +131,6 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact queueOperateListeners = new LinkedList<>(); sortOperateListeners = new LinkedList<>(); sinkOperateListeners = new LinkedList<>(); - scheduleOperateListeners = new LinkedList<>(); } /** @@ -199,18 +184,4 @@ public class StreamTaskListenerFactory implements PluginBinder, TaskListenerFact } return listeners; } - - /** - * Get schedule operate listener list. - */ - private List<ScheduleOperateListener> getScheduleOperateListener(WorkflowContext context) { - List<ScheduleOperateListener> listeners = new ArrayList<>(); - for (ScheduleOperateListener listener : scheduleOperateListeners) { - if (listener != null && listener.accept(context)) { - listeners.add(listener); - } - } - return listeners; - } - } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java similarity index 54% rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java index 61179dc343..e27dab9e89 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/StreamScheduleResourceListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/schedule/GroupScheduleResourceListener.java @@ -20,10 +20,10 @@ package org.apache.inlong.manager.service.listener.schedule; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.enums.GroupOperateType; import org.apache.inlong.manager.common.enums.TaskEvent; -import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo; -import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; +import org.apache.inlong.manager.pojo.group.InlongGroupInfo; +import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; -import org.apache.inlong.manager.pojo.workflow.form.process.StreamResourceProcessForm; import org.apache.inlong.manager.workflow.WorkflowContext; import org.apache.inlong.manager.workflow.event.ListenerResult; import org.apache.inlong.manager.workflow.event.task.ScheduleOperateListener; @@ -35,7 +35,7 @@ import java.util.List; @Service @Slf4j -public class StreamScheduleResourceListener implements ScheduleOperateListener { +public class GroupScheduleResourceListener implements ScheduleOperateListener { @Override public TaskEvent event() { @@ -46,48 +46,44 @@ public class StreamScheduleResourceListener implements ScheduleOperateListener { public boolean accept(WorkflowContext context) { ProcessForm processForm = context.getProcessForm(); String groupId = processForm.getInlongGroupId(); - if (!(processForm instanceof StreamResourceProcessForm)) { - log.info("not add schedule stream listener, not StreamResourceProcessForm for groupId [{}]", groupId); + if (!(processForm instanceof GroupResourceProcessForm)) { + log.info("not add schedule group listener, not groupResourceProcessForm for groupId [{}]", groupId); return false; } - StreamResourceProcessForm streamProcessForm = (StreamResourceProcessForm) processForm; - String streamId = streamProcessForm.getStreamInfo().getInlongStreamId(); - if (streamProcessForm.getGroupOperateType() != GroupOperateType.INIT) { - log.info("not add schedule stream listener, as the operate was not INIT for groupId [{}] streamId [{}]", - groupId, streamId); + GroupResourceProcessForm groupProcessForm = (GroupResourceProcessForm) processForm; + if (groupProcessForm.getGroupOperateType() != GroupOperateType.INIT) { + log.info("not add schedule group listener, as the operate was not INIT for groupId [{}]", groupId); return false; } - log.info("add startup stream listener for groupId [{}] streamId [{}]", groupId, streamId); - return InlongConstants.DATASYNC_OFFLINE_MODE.equals(streamProcessForm.getGroupInfo().getInlongGroupMode()); + log.info("add startup group listener for groupId [{}]", groupId); + return InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()); } @Override public ListenerResult listen(WorkflowContext context) throws Exception { - StreamResourceProcessForm form = (StreamResourceProcessForm) context.getProcessForm(); - InlongStreamInfo streamInfo = form.getStreamInfo(); - final String groupId = streamInfo.getInlongGroupId(); - final String streamId = streamInfo.getInlongStreamId(); - log.info("begin to register schedule info for groupId={}, streamId={}", groupId, streamId); + GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm(); + InlongGroupInfo groupInfo = form.getGroupInfo(); + final String groupId = groupInfo.getInlongGroupId(); + log.info("begin to register schedule info for groupId={}", groupId); // todo: register schedule info to schedule service - // after register schedule info successfully, add ext property to stream info - saveInfo(streamInfo, InlongConstants.REGISTER_SCHEDULE_STATUS, - InlongConstants.REGISTERED, streamInfo.getExtList()); - log.info("success to register schedule info for group [" + groupId + "] and stream [" + streamId + "]"); + // after register schedule info successfully, add ext property to group ext info + saveInfo(groupInfo, InlongConstants.REGISTER_SCHEDULE_STATUS, + InlongConstants.REGISTERED, groupInfo.getExtList()); + log.info("success to register schedule info for group={}", groupId); return ListenerResult.success(); } /** * Save stream ext info into list. */ - private void saveInfo(InlongStreamInfo streamInfo, String keyName, String keyValue, - List<InlongStreamExtInfo> extInfoList) { - InlongStreamExtInfo extInfo = new InlongStreamExtInfo(); + private void saveInfo(InlongGroupInfo streamInfo, String keyName, String keyValue, + List<InlongGroupExtInfo> extInfoList) { + InlongGroupExtInfo extInfo = new InlongGroupExtInfo(); extInfo.setInlongGroupId(streamInfo.getInlongGroupId()); - extInfo.setInlongStreamId(streamInfo.getInlongStreamId()); extInfo.setKeyName(keyName); extInfo.setKeyValue(keyValue); extInfoList.add(extInfo); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java index f8981bf09a..067a320a49 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java @@ -24,7 +24,6 @@ import org.apache.inlong.manager.common.enums.TaskEvent; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; -import org.apache.inlong.manager.pojo.sort.util.StreamParseUtils; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm; import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm; @@ -126,11 +125,10 @@ public class SortConfigListener implements SortOperateListener { try { for (InlongStreamInfo streamInfo : streamInfos) { - // do not build sort config if the group mode is offline and the stream is not config successfully - if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode()) - && !StreamParseUtils.isRegisterScheduleSuccess(streamInfo)) { - LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline " - + "and the stream is not config successfully yet", groupId, streamInfo.getInlongStreamId()); + // do not build sort config if the group mode is offline + if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) { + LOGGER.info("no need to build sort config for groupId={} streamId={} as the mode is offline", + groupId, streamInfo.getInlongStreamId()); continue; } List<StreamSink> sinkList = streamInfo.getSinkList(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java index 08d04b58ee..1b30839308 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java @@ -88,7 +88,7 @@ public class PluginClassLoader extends URLClassLoader { * Get pluginClassLoader by plugin url. */ public static PluginClassLoader getFromPluginUrl(String url, ClassLoader parent) { - log.info("ClassLoaderPath:{}", url); + log.info("ClassLoaderPath : {}", url); checkClassLoader(parent); checkUrl(url); return AccessController.doPrivileged(new PrivilegedAction<PluginClassLoader>() { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java index bc8f0c2546..b9cacdffc4 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java @@ -86,6 +86,14 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition { initSortTask.setListenerFactory(groupTaskListenerFactory); process.addTask(initSortTask); + // Init Schedule + ServiceTask initScheduleTask = new ServiceTask(); + initScheduleTask.setName("InitSchedule"); + initScheduleTask.setDisplayName("Group-InitSchedule"); + initScheduleTask.setServiceTaskType(ServiceTaskType.INIT_SCHEDULE); + initScheduleTask.setListenerFactory(groupTaskListenerFactory); + process.addTask(initScheduleTask); + // End node EndEvent endEvent = new EndEvent(); process.setEndEvent(endEvent); @@ -94,7 +102,8 @@ public class CreateGroupWorkflowDefinition implements WorkflowDefinition { // To ensure that after some tasks fail, data will not start to be collected by source or consumed by sort startEvent.addNext(initMQTask); initMQTask.addNext(initSortTask); - initSortTask.addNext(endEvent); + initSortTask.addNext(initScheduleTask); + initScheduleTask.addNext(endEvent); return process; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java index 5505ed0694..2ec6e9052e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java @@ -102,14 +102,6 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition { initSourceTask.setListenerFactory(streamTaskListenerFactory); process.addTask(initSourceTask); - // Init Schedule info - ServiceTask initScheduleTask = new ServiceTask(); - initScheduleTask.setName("InitSchedule"); - initScheduleTask.setDisplayName("Stream-InitSchedule"); - initScheduleTask.setServiceTaskType(ServiceTaskType.INIT_SCHEDULE); - initScheduleTask.setListenerFactory(streamTaskListenerFactory); - process.addTask(initScheduleTask); - // End node EndEvent endEvent = new EndEvent(); process.setEndEvent(endEvent); @@ -120,8 +112,7 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition { initMQTask.addNext(initSinkTask); initSinkTask.addNext(initSortTask); initSortTask.addNext(initSourceTask); - initSourceTask.addNext(initScheduleTask); - initScheduleTask.addNext(endEvent); + initSourceTask.addNext(endEvent); return process; } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java index abae87fef4..e56b013d57 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java @@ -40,7 +40,8 @@ public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest { Assertions.assertNotSame(cloneProcess2, cloneProcess1); Assertions.assertNotNull(process.getTaskByName("InitMQ")); Assertions.assertNotNull(process.getTaskByName("InitSort")); - Assertions.assertEquals(2, process.getNameToTaskMap().size()); + Assertions.assertNotNull(process.getTaskByName("InitSchedule")); + Assertions.assertEquals(3, process.getNameToTaskMap().size()); } }
