This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new 8819bfc166 [INLONG-10459][Manager] Support schedule instance callback
to submit Flink batch job (#10510)
8819bfc166 is described below
commit 8819bfc1666902c19f46ab995eba0ab64990fa4f
Author: AloysZhang <[email protected]>
AuthorDate: Wed Jun 26 13:11:11 2024 +0800
[INLONG-10459][Manager] Support schedule instance callback to submit Flink
batch job (#10510)
* [INLONG-10459][Manager] Support schedule instance callback to submit
Flink batch job
---
.../plugin/listener/StartupSortListener.java | 38 ++--------------
.../plugin/offline/FlinkOfflineJobOperator.java} | 30 +++++--------
.../inlong/manager/plugin/util/FlinkUtils.java | 32 ++++++++++++++
.../inlong/manager/pojo/schedule/ScheduleInfo.java | 16 +++----
.../manager/pojo/schedule/ScheduleInfoRequest.java | 16 +++----
.../manager/schedule/NoopScheduleClient.java | 2 +-
.../manager/service/group/InlongGroupService.java | 7 +++
.../service/group/InlongGroupServiceImpl.java | 18 ++++++++
.../service/listener/sort/SortConfigListener.java | 10 +----
.../schedule/OfflineJobOperatorFactory.java | 51 ++++++++++++++++++++++
.../manager/service/schedule/ScheduleOperator.java | 11 +++++
.../service/schedule/ScheduleOperatorImpl.java | 37 +++++++++++++---
.../web/controller/InlongGroupController.java | 6 +++
.../src/main/resources/application.properties | 6 +--
.../workflow/processor/OfflineJobOperator.java} | 29 +++---------
15 files changed, 194 insertions(+), 115 deletions(-)
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index b0624fbe66..5d1dd82578 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -20,8 +20,6 @@ package org.apache.inlong.manager.plugin.listener;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.TaskEvent;
-import org.apache.inlong.manager.plugin.util.FlinkUtils;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -32,9 +30,9 @@ import
org.apache.inlong.manager.workflow.event.task.SortOperateListener;
import lombok.extern.slf4j.Slf4j;
-import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
+
+import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;
/**
* Listener of startup sort.
@@ -62,8 +60,7 @@ public class StartupSortListener implements
SortOperateListener {
}
log.info("add startup group listener for groupId [{}]", groupId);
- return
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode())
- ||
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
+ return
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode()));
}
@Override
@@ -77,36 +74,9 @@ public class StartupSortListener implements
SortOperateListener {
}
GroupResourceProcessForm groupResourceForm =
(GroupResourceProcessForm) processForm;
- InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
- // do not build sort config if the group mode is offline
- if
(InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
- log.info("no need to launching sort job for groupId={} as the mode
is offline",
- groupId);
- return ListenerResult.success();
- }
List<InlongStreamInfo> streamInfos =
groupResourceForm.getStreamInfos();
- int sinkCount = streamInfos.stream()
- .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
- .reduce(0, Integer::sum);
- if (sinkCount == 0) {
- log.warn("not any sink configured for group {}, skip launching
sort job", groupId);
- return ListenerResult.success();
- }
- List<ListenerResult> listenerResults = new ArrayList<>();
- for (InlongStreamInfo streamInfo : streamInfos) {
- listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo,
- FlinkUtils.genFlinkJobName(processForm, streamInfo)));
- }
-
- // only one stream in group for now
- // we can return the list of ListenerResult if support multi-stream in
the future
- List<ListenerResult> failedStreams = listenerResults.stream()
- .filter(t -> !t.isSuccess()).collect(Collectors.toList());
- if (failedStreams.isEmpty()) {
- return ListenerResult.success();
- }
- return ListenerResult.fail(failedStreams.get(0).getRemark());
+ return submitFlinkJobs(groupId, streamInfos);
}
/**
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
similarity index 56%
copy from
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
copy to
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
index a122235de3..09740c53f3 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
@@ -15,32 +15,22 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.plugin.offline;
-import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
-import org.springframework.stereotype.Service;
+import lombok.NoArgsConstructor;
-@Service
-public class NoopScheduleClient implements ScheduleEngineClient {
+import java.util.List;
- @Override
- public boolean accept(String engineType) {
- return ScheduleEngineType.NONE.getType().equals(engineType);
- }
+import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;
- @Override
- public boolean register(ScheduleInfo scheduleInfo) {
- return true;
- }
-
- @Override
- public boolean unregister(String groupId) {
- return true;
- }
+@NoArgsConstructor
+public class FlinkOfflineJobOperator implements OfflineJobOperator {
@Override
- public boolean update(ScheduleInfo scheduleInfo) {
- return true;
+ public void submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList) throws Exception {
+ submitFlinkJobs(groupId, streamInfoList);
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index d4c7863371..82a0d628c1 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -218,6 +218,33 @@ public class FlinkUtils {
return flinkConfig;
}
+ public static ListenerResult submitFlinkJobs(String groupId,
List<InlongStreamInfo> streamInfoList)
+ throws Exception {
+ int sinkCount = streamInfoList.stream()
+ .map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
+ .reduce(0, Integer::sum);
+ if (sinkCount == 0) {
+ log.warn("Not any sink configured for group {} and stream list {},
skip launching sort job", groupId,
+ streamInfoList.stream()
+ .map(s -> s.getInlongGroupId() + ":" +
s.getName()).collect(Collectors.toList()));
+ return ListenerResult.success();
+ }
+
+ List<ListenerResult> listenerResults = new ArrayList<>();
+ for (InlongStreamInfo streamInfo : streamInfoList) {
+ listenerResults.add(FlinkUtils.submitFlinkJob(streamInfo,
FlinkUtils.genFlinkJobName(streamInfo)));
+ }
+
+ // only one stream in group for now
+ // we can return the list of ListenerResult if support multi-stream in
the future
+ List<ListenerResult> failedStreams = listenerResults.stream()
+ .filter(t -> !t.isSuccess()).collect(Collectors.toList());
+ if (failedStreams.isEmpty()) {
+ return ListenerResult.success();
+ }
+ return ListenerResult.fail(failedStreams.get(0).getRemark());
+ }
+
public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo,
String jobName) throws Exception {
List<StreamSink> sinkList = streamInfo.getSinkList();
List<String> sinkTypes =
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
@@ -295,4 +322,9 @@ public class FlinkUtils {
return Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) +
InlongConstants.HYPHEN
+ streamInfo.getInlongStreamId();
}
+
+ public static String genFlinkJobName(InlongStreamInfo streamInfo) {
+ return String.format(Constants.SORT_JOB_NAME_TEMPLATE,
streamInfo.getInlongGroupId()) + InlongConstants.HYPHEN
+ + streamInfo.getInlongStreamId();
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
index 24f2e6196e..0bc8d2e6db 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfo.java
@@ -89,16 +89,16 @@ public class ScheduleInfo {
return false;
}
ScheduleInfo that = (ScheduleInfo) o;
- return Objects.equals(id, that.id) && Objects.equals(inlongGroupId,
that.inlongGroupId)
- && Objects.equals(scheduleType, that.scheduleType) &&
Objects.equals(scheduleUnit,
- that.scheduleUnit)
+ return Objects.equals(inlongGroupId, that.inlongGroupId)
+ && Objects.equals(scheduleType, that.scheduleType)
+ && Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
- && Objects.equals(startTime, that.startTime) &&
Objects.equals(endTime, that.endTime)
- && Objects.equals(delayTime, that.delayTime) &&
Objects.equals(selfDepend,
- that.selfDepend)
+ && Objects.equals(startTime, that.startTime)
+ && Objects.equals(endTime, that.endTime)
+ && Objects.equals(delayTime, that.delayTime)
+ && Objects.equals(selfDepend, that.selfDepend)
&& Objects.equals(taskParallelism, that.taskParallelism)
- && Objects.equals(crontabExpression, that.crontabExpression)
&& Objects.equals(version,
- that.version);
+ && Objects.equals(crontabExpression, that.crontabExpression);
}
@Override
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
index a324cee7f5..7e81f7533f 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/ScheduleInfoRequest.java
@@ -83,16 +83,16 @@ public class ScheduleInfoRequest {
return false;
}
ScheduleInfoRequest that = (ScheduleInfoRequest) o;
- return Objects.equals(id, that.id) && Objects.equals(inlongGroupId,
that.inlongGroupId)
- && Objects.equals(scheduleType, that.scheduleType) &&
Objects.equals(scheduleUnit,
- that.scheduleUnit)
+ return Objects.equals(inlongGroupId, that.inlongGroupId)
+ && Objects.equals(scheduleType, that.scheduleType)
+ && Objects.equals(scheduleUnit, that.scheduleUnit)
&& Objects.equals(scheduleInterval, that.scheduleInterval)
- && Objects.equals(startTime, that.startTime) &&
Objects.equals(endTime, that.endTime)
- && Objects.equals(delayTime, that.delayTime) &&
Objects.equals(selfDepend,
- that.selfDepend)
+ && Objects.equals(startTime, that.startTime)
+ && Objects.equals(endTime, that.endTime)
+ && Objects.equals(delayTime, that.delayTime)
+ && Objects.equals(selfDepend, that.selfDepend)
&& Objects.equals(taskParallelism, that.taskParallelism)
- && Objects.equals(crontabExpression, that.crontabExpression)
&& Objects.equals(version,
- that.version);
+ && Objects.equals(crontabExpression, that.crontabExpression);
}
@Override
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
index a122235de3..7b7ddf19b0 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
@@ -26,7 +26,7 @@ public class NoopScheduleClient implements
ScheduleEngineClient {
@Override
public boolean accept(String engineType) {
- return ScheduleEngineType.NONE.getType().equals(engineType);
+ return ScheduleEngineType.NONE.getType().equalsIgnoreCase(engineType);
}
@Override
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index 4906e91aa7..c6643c3259 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -218,4 +218,11 @@ public interface InlongGroupService {
*/
List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag);
+ /**
+ * Submitting offline job for the given group.
+ * @param groupId the inlong group to submit offline job
+ *
+ * */
+ Boolean submitOfflineJob(String groupId);
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index d42a61dbfb..d2f3c0c948 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -931,4 +931,22 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
return groupInfoList;
}
+ @Override
+ public Boolean submitOfflineJob(String groupId) {
+ // 1. get stream info list
+ InlongGroupInfo groupInfo = get(groupId);
+ if (groupInfo == null) {
+ String msg = String.format("InLong group not found for group=%s",
groupId);
+ LOGGER.error(msg);
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ List<InlongStreamInfo> streamInfoList = streamService.list(groupId);
+ if (CollectionUtils.isEmpty(streamInfoList)) {
+ LOGGER.warn("No stream info found for group {}, skip submit
offline job", groupId);
+ return false;
+ }
+ return scheduleOperator.submitOfflineJob(groupId, streamInfoList);
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
index 067a320a49..556a4c41fa 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java
@@ -125,12 +125,6 @@ public class SortConfigListener implements
SortOperateListener {
try {
for (InlongStreamInfo streamInfo : streamInfos) {
- // do not build sort config if the group mode is offline
- if
(InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupInfo.getInlongGroupMode())) {
- LOGGER.info("no need to build sort config for groupId={}
streamId={} as the mode is offline",
- groupId, streamInfo.getInlongStreamId());
- continue;
- }
List<StreamSink> sinkList = streamInfo.getSinkList();
if (CollectionUtils.isEmpty(sinkList)) {
continue;
@@ -143,8 +137,8 @@ public class SortConfigListener implements
SortOperateListener {
}
}
} catch (Exception e) {
- String msg = String.format("failed to build sort config for
groupId=%s, ", groupId);
- LOGGER.error(msg + "streamInfos=" + streamInfos, e);
+ String msg = String.format("Failed to build sort config for
group=%s, ", groupId);
+ LOGGER.error("{} streamInfos={}", msg, streamInfos, e);
throw new WorkflowListenerException(msg + e.getMessage());
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java
new file mode 100644
index 0000000000..001f303a7b
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/OfflineJobOperatorFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.schedule;
+
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OfflineJobOperatorFactory {
+
+ private static final Logger log =
LoggerFactory.getLogger(OfflineJobOperatorFactory.class);
+ public static final String DEFAULT_OPERATOR_CLASS_NAME =
+ "org.apache.inlong.manager.plugin.offline.FlinkOfflineJobOperator";
+
+ public static OfflineJobOperator getOfflineJobOperator() {
+ return getOfflineJobOperator(DEFAULT_OPERATOR_CLASS_NAME);
+ }
+
+ public static OfflineJobOperator getOfflineJobOperator(String
operatorClassName) {
+ return getOfflineJobOperator(operatorClassName,
Thread.currentThread().getContextClassLoader());
+ }
+
+ public static OfflineJobOperator getOfflineJobOperator(String
operatorClassName, ClassLoader classLoader) {
+ try {
+ Class<?> operatorClass = classLoader.loadClass(operatorClassName);
+ Object operator =
operatorClass.getDeclaredConstructor().newInstance();
+ return (OfflineJobOperator) operator;
+ } catch (Throwable e) {
+ log.error("Failed to get offline job operator: ", e);
+ throw new BusinessException("Failed to get offline job operator: "
+ e.getMessage());
+ }
+ }
+
+}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
index 6bf9c01432..653523294e 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
@@ -19,6 +19,9 @@ package org.apache.inlong.manager.service.schedule;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+
+import java.util.List;
/**
* Operator for schedule. Including:
@@ -92,4 +95,12 @@ public interface ScheduleOperator {
* @Return whether succeed
* */
Boolean handleGroupApprove(String groupId);
+
+ /**
+ * Start offline sync job when the schedule instance callback.
+ * @param groupId groupId to start offline job
+ * @param streamInfoList stream list to start offline job
+ * @Return whether succeed
+ * */
+ Boolean submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
index 411a80766a..a20fa0cb10 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -18,14 +18,16 @@
package org.apache.inlong.manager.service.schedule;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupExtEntityMapper;
-import org.apache.inlong.manager.dao.mapper.ScheduleEntityMapper;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.schedule.ScheduleClientFactory;
import org.apache.inlong.manager.schedule.ScheduleEngineClient;
+import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +35,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
+import java.util.List;
+import java.util.stream.Collectors;
+
import static org.apache.inlong.manager.common.enums.ScheduleStatus.APPROVED;
import static org.apache.inlong.manager.common.enums.ScheduleStatus.REGISTERED;
import static org.apache.inlong.manager.common.enums.ScheduleStatus.UPDATED;
@@ -48,12 +53,11 @@ public class ScheduleOperatorImpl implements
ScheduleOperator {
@Autowired
private InlongGroupExtEntityMapper groupExtMapper;
- @Autowired
- private ScheduleEntityMapper scheduleMapper;
-
@Autowired
private ScheduleClientFactory scheduleClientFactory;
+ private OfflineJobOperator offlineJobOperator;
+
private ScheduleEngineClient scheduleEngineClient;
@Override
@@ -75,7 +79,7 @@ public class ScheduleOperatorImpl implements ScheduleOperator
{
String groupId = scheduleInfo.getInlongGroupId();
InlongGroupExtEntity scheduleStatusExt =
groupExtMapper.selectByUniqueKey(groupId,
InlongConstants.REGISTER_SCHEDULE_STATUS);
- if
(InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) {
+ if (scheduleStatusExt != null &&
InlongConstants.REGISTERED.equalsIgnoreCase(scheduleStatusExt.getKeyValue())) {
// change schedule state to approved
scheduleService.updateStatus(scheduleInfo.getInlongGroupId(),
APPROVED, operator);
registerToScheduleEngine(scheduleInfo, operator, false);
@@ -124,8 +128,10 @@ public class ScheduleOperatorImpl implements
ScheduleOperator {
@Override
@Transactional(rollbackFor = Throwable.class)
public Boolean updateAndRegister(ScheduleInfoRequest request, String
operator) {
- updateOpt(request, operator);
- return
registerToScheduleEngine(CommonBeanUtils.copyProperties(request,
ScheduleInfo::new), operator, true);
+ if (updateOpt(request, operator)) {
+ return
registerToScheduleEngine(CommonBeanUtils.copyProperties(request,
ScheduleInfo::new), operator, true);
+ }
+ return false;
}
/**
@@ -172,4 +178,21 @@ public class ScheduleOperatorImpl implements
ScheduleOperator {
return registerToScheduleEngine(scheduleInfo, null, false);
}
+ @Override
+ public Boolean submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList) {
+ if (offlineJobOperator == null) {
+ offlineJobOperator =
OfflineJobOperatorFactory.getOfflineJobOperator();
+ }
+ try {
+ offlineJobOperator.submitOfflineJob(groupId, streamInfoList);
+ LOGGER.info("Submit offline job for group {} and stream list {}
success.", groupId,
+
streamInfoList.stream().map(InlongStreamInfo::getName).collect(Collectors.toList()));
+ } catch (Exception e) {
+ String errorMsg = String.format("Submit offline job failed for
groupId=%s", groupId);
+ LOGGER.error(errorMsg, e);
+ throw new BusinessException(errorMsg);
+ }
+ return true;
+ }
+
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 04d89d332c..3a9389c297 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -252,4 +252,10 @@ public class InlongGroupController {
return Response.success(groupService.finishTagSwitch(groupId));
}
+ @RequestMapping(value = "/group/submitOfflineJob/{groupId}", method =
RequestMethod.POST)
+ @ApiOperation(value = "Submitting inlong offline job process")
+ @ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class)
+ public Response<Boolean> submitOfflineJob(@PathVariable String groupId) {
+ return Response.success(groupService.submitOfflineJob(groupId));
+ }
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-web/src/main/resources/application.properties
b/inlong-manager/manager-web/src/main/resources/application.properties
index a6eec820ad..a498ac4d15 100644
--- a/inlong-manager/manager-web/src/main/resources/application.properties
+++ b/inlong-manager/manager-web/src/main/resources/application.properties
@@ -65,8 +65,4 @@ audit.admin.ids=3,4,5,6
audit.user.ids=3,4,5,6
# tencent cloud log service endpoint, The Operator cls resource by it
-cls.manager.endpoint=127.0.0.1
-
-# schedule engine type
-# support none(no scheduler) and quartz(quartz scheduler), default is none
-inlong.schedule.engine=none
\ No newline at end of file
+cls.manager.endpoint=127.0.0.1
\ No newline at end of file
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
similarity index 55%
copy from
inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
copy to
inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
index a122235de3..f6d395dc7e 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/NoopScheduleClient.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
@@ -15,32 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.schedule;
+package org.apache.inlong.manager.workflow.processor;
-import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.springframework.stereotype.Service;
+import java.util.List;
-@Service
-public class NoopScheduleClient implements ScheduleEngineClient {
+public interface OfflineJobOperator {
- @Override
- public boolean accept(String engineType) {
- return ScheduleEngineType.NONE.getType().equals(engineType);
- }
-
- @Override
- public boolean register(ScheduleInfo scheduleInfo) {
- return true;
- }
-
- @Override
- public boolean unregister(String groupId) {
- return true;
- }
-
- @Override
- public boolean update(ScheduleInfo scheduleInfo) {
- return true;
- }
+ void submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList) throws Exception;
}