This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new 9c714530a1 [INLONG-10561][Manager] Support configrations for bounded
source (#10571)
9c714530a1 is described below
commit 9c714530a1cfcd3f5c21be7a472d18db78459176
Author: AloysZhang <[email protected]>
AuthorDate: Mon Jul 8 10:07:42 2024 +0800
[INLONG-10561][Manager] Support configrations for bounded source (#10571)
Co-authored-by: Aloys Zhang <[email protected]>
---
.../client/api/inner/client/InlongGroupClient.java | 5 +-
.../manager/client/api/service/InlongGroupApi.java | 5 +-
.../manager/common/consts/InlongConstants.java | 4 ++
.../inlong/manager/common/enums/ErrorCodeEnum.java | 4 ++
.../inlong/manager/plugin/flink/FlinkService.java | 13 +++++-
.../inlong/manager/plugin/flink/dto/FlinkInfo.java | 6 +++
.../plugin/offline/FlinkOfflineJobOperator.java | 6 ++-
.../inlong/manager/plugin/util/FlinkUtils.java | 17 ++++---
.../pojo/schedule/OfflineJobSubmitRequest.java} | 31 ++++++++-----
.../schedule/quartz/QuartzOfflineSyncJob.java | 45 +++++++++++++-----
.../manager/schedule/util/ScheduleUtils.java | 13 ++++--
.../inlong/manager/schedule/BaseScheduleTest.java | 5 ++
.../manager/service/group/InlongGroupService.java | 7 +--
.../service/group/InlongGroupServiceImpl.java | 42 ++++++++++++++++-
.../manager/service/schedule/ScheduleOperator.java | 4 +-
.../service/schedule/ScheduleOperatorImpl.java | 5 +-
.../service/source/bounded/BoundedSourceType.java | 54 ++++++++++++++++++++++
.../web/controller/InlongGroupController.java | 12 +++--
.../workflow/processor/OfflineJobOperator.java | 4 +-
19 files changed, 226 insertions(+), 56 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index fcc907396e..d053ea37a5 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -36,6 +36,7 @@ import
org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
@@ -320,8 +321,8 @@ public class InlongGroupClient {
return response.getData();
}
- public Boolean submitOfflineJob(String groupId) {
- Response<Boolean> responseBody =
ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(groupId));
+ public Boolean submitOfflineJob(OfflineJobSubmitRequest request) {
+ Response<Boolean> responseBody =
ClientUtils.executeHttpCall(inlongGroupApi.submitOfflineJob(request));
ClientUtils.assertRespSuccess(responseBody);
return responseBody.getData();
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
index 930a3a8144..1761193644 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
@@ -26,6 +26,7 @@ import
org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import retrofit2.Call;
@@ -99,6 +100,6 @@ public interface InlongGroupApi {
@GET("group/switch/finish/{groupId}")
Call<Response<Boolean>> finishTagSwitch(@Path("groupId") String groupId);
- @POST("group/submitOfflineJob/{groupId}")
- Call<Response<Boolean>> submitOfflineJob(@Path("groupId") String groupId);
+ @POST("group/submitOfflineJob")
+ Call<Response<Boolean>> submitOfflineJob(@Body OfflineJobSubmitRequest
request);
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 4abcc0f359..d9957e81f5 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -93,6 +93,10 @@ public class InlongConstants {
public static final String RUNTIME_EXECUTION_MODE_STREAM = "stream";
public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";
+ public static final String BOUNDARY_TYPE = "BOUNDARY_TYPE";
+ public static final String LOWER_BOUNDARY = "LOWER_BOUNDARY";
+ public static final String UPPER_BOUNDARY = "UPPER_BOUNDARY";
+
public static final Integer DISABLE_ZK = 0;
public static final Integer ENABLE_ZK = 1;
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index 2a82325ad9..9364fda8e7 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -130,6 +130,10 @@ public enum ErrorCodeEnum {
SCHEDULE_ENGINE_NOT_SUPPORTED(1702, "Schedule engine type not supported"),
SCHEDULE_STATUS_TRANSITION_NOT_ALLOWED(1703, "Schedule status transition
is not allowed"),
+ BOUNDED_SOURCE_TYPE_NOT_SUPPORTED(1801, "Bounded source type %s not
supported"),
+ BOUNDARY_TYPE_NOT_SUPPORTED(1802, "Boundary type %s not supported"),
+ BOUNDARIES_NOT_FOUND(1803, "Boundaries not found"),
+
WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
WORKFLOW_APPROVER_NOT_FOUND(4001, "Workflow approver does not exist/no
operation authority"),
WORKFLOW_DELETE_RECORD_FAILED(4002, "Workflow delete record failure"),
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 9188c6b73f..f3809ce8ae 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.plugin.flink;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
@@ -258,8 +259,16 @@ public class FlinkService {
list.add(flinkInfo.getLocalConfPath());
list.add("-checkpoint.interval");
list.add("60000");
- list.add("-runtime.execution.mode");
- list.add(flinkInfo.getRuntimeExecutionMode());
+ if
(InlongConstants.RUNTIME_EXECUTION_MODE_BATCH.equalsIgnoreCase(flinkInfo.getRuntimeExecutionMode()))
{
+ list.add("-runtime.execution.mode");
+ list.add(flinkInfo.getRuntimeExecutionMode());
+ list.add("-source.boundary.type");
+ list.add(flinkInfo.getBoundaryType());
+ list.add("-source.lower.boundary");
+ list.add(flinkInfo.getLowerBoundary());
+ list.add("-source.upper.boundary");
+ list.add(flinkInfo.getUpperBoundary());
+ }
return list.toArray(new String[0]);
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index 4c3c75f855..e8b924f138 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -54,4 +54,10 @@ public class FlinkInfo {
private String exceptionMsg;
private String runtimeExecutionMode;
+
+ private String boundaryType;
+
+ private String lowerBoundary;
+
+ private String upperBoundary;
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
index 1c93e7636a..9a6eca45ce 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.plugin.offline;
+import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
@@ -30,7 +31,8 @@ import static
org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;
public class FlinkOfflineJobOperator implements OfflineJobOperator {
@Override
- public void submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList) throws Exception {
- submitFlinkJobs(groupId, streamInfoList, true);
+ public void submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList, Boundaries boundaries)
+ throws Exception {
+ submitFlinkJobs(groupId, streamInfoList, true, boundaries);
}
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
index fd0c2c8bf2..1110ef45d8 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.plugin.util;
+import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.util.JsonUtils;
@@ -222,11 +223,11 @@ public class FlinkUtils {
public static ListenerResult submitFlinkJobs(String groupId,
List<InlongStreamInfo> streamInfoList)
throws Exception {
- return submitFlinkJobs(groupId, streamInfoList, false);
+ return submitFlinkJobs(groupId, streamInfoList, false, null);
}
public static ListenerResult submitFlinkJobs(String groupId,
List<InlongStreamInfo> streamInfoList,
- boolean isBatchJob) throws Exception {
+ boolean isBatchJob, Boundaries boundaries) throws Exception {
int sinkCount = streamInfoList.stream()
.map(s -> s.getSinkList() == null ? 0 : s.getSinkList().size())
.reduce(0, Integer::sum);
@@ -240,7 +241,8 @@ public class FlinkUtils {
List<ListenerResult> listenerResults = new ArrayList<>();
for (InlongStreamInfo streamInfo : streamInfoList) {
listenerResults.add(
- FlinkUtils.submitFlinkJob(streamInfo,
FlinkUtils.genFlinkJobName(streamInfo), isBatchJob));
+ FlinkUtils.submitFlinkJob(
+ streamInfo,
FlinkUtils.genFlinkJobName(streamInfo), isBatchJob, boundaries));
}
// only one stream in group for now
@@ -254,11 +256,11 @@ public class FlinkUtils {
}
public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo,
String jobName) throws Exception {
- return submitFlinkJob(streamInfo, jobName, false);
+ return submitFlinkJob(streamInfo, jobName, false, null);
}
- public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo,
String jobName, boolean isBatchJob)
- throws Exception {
+ public static ListenerResult submitFlinkJob(InlongStreamInfo streamInfo,
String jobName,
+ boolean isBatchJob, Boundaries boundaries) throws Exception {
List<StreamSink> sinkList = streamInfo.getSinkList();
List<String> sinkTypes =
sinkList.stream().map(StreamSink::getSinkType).collect(Collectors.toList());
if (CollectionUtils.isEmpty(sinkList) ||
!SinkType.containSortFlinkSink(sinkTypes)) {
@@ -298,6 +300,9 @@ public class FlinkUtils {
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
if (isBatchJob) {
flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_BATCH);
+ flinkInfo.setBoundaryType(boundaries.getBoundaryType().getType());
+ flinkInfo.setLowerBoundary(boundaries.getLowerBound());
+ flinkInfo.setUpperBoundary(boundaries.getUpperBound());
} else {
flinkInfo.setRuntimeExecutionMode(RUNTIME_EXECUTION_MODE_STREAM);
}
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java
similarity index 56%
copy from
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java
index 1c93e7636a..6e9dcc91e8 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/offline/FlinkOfflineJobOperator.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/schedule/OfflineJobSubmitRequest.java
@@ -15,22 +15,29 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.plugin.offline;
+package org.apache.inlong.manager.pojo.schedule;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.workflow.processor.OfflineJobOperator;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
-import lombok.NoArgsConstructor;
+import javax.validation.constraints.NotNull;
-import java.util.List;
+@Data
+public class OfflineJobSubmitRequest {
-import static org.apache.inlong.manager.plugin.util.FlinkUtils.submitFlinkJobs;
+ @ApiModelProperty("Inlong Group ID")
+ @NotNull
+ private String groupId;
-@NoArgsConstructor
-public class FlinkOfflineJobOperator implements OfflineJobOperator {
+ @ApiModelProperty("Source boundary type, TIME and OFFSET are supported")
+ @NotNull
+ private String boundaryType;
- @Override
- public void submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList) throws Exception {
- submitFlinkJobs(groupId, streamInfoList, true);
- }
+ @ApiModelProperty("The lower bound for bounded source")
+ @NotNull
+ private String lowerBoundary;
+
+ @ApiModelProperty("The upper bound for bounded source")
+ @NotNull
+ private String upperBoundary;
}
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
index 2b66766474..6e369c4c41 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/quartz/QuartzOfflineSyncJob.java
@@ -17,12 +17,14 @@
package org.apache.inlong.manager.schedule.quartz;
+import org.apache.inlong.common.bounded.BoundaryType;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -35,10 +37,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
+import static org.apache.inlong.manager.schedule.util.ScheduleUtils.END_TIME;
import static
org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_HOST;
import static
org.apache.inlong.manager.schedule.util.ScheduleUtils.MANAGER_PORT;
-import static org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_ID;
-import static
org.apache.inlong.manager.schedule.util.ScheduleUtils.SECRETE_KEY;
+import static org.apache.inlong.manager.schedule.util.ScheduleUtils.PASSWORD;
+import static org.apache.inlong.manager.schedule.util.ScheduleUtils.USERNAME;
@Data
@NoArgsConstructor
@@ -49,31 +52,49 @@ public class QuartzOfflineSyncJob implements Job {
private static final Logger LOGGER =
LoggerFactory.getLogger(QuartzOfflineSyncJob.class);
private volatile InlongGroupClient groupClient;
+ private long endTime;
@Override
public void execute(JobExecutionContext context) throws
JobExecutionException {
LOGGER.info("QuartzOfflineSyncJob run once");
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
initGroupClientIfNeeded(jobDataMap);
+
String inlongGroupId = context.getJobDetail().getKey().getName();
- LOGGER.info("Starting submit offline job for group {}", inlongGroupId);
- if (groupClient.submitOfflineJob(inlongGroupId)) {
- LOGGER.info("Successfully submitting offline job for group {}",
inlongGroupId);
- } else {
- LOGGER.warn("Failed to submit offline job for group {}",
inlongGroupId);
+ long lowerBoundary = context.getScheduledFireTime().getTime();
+ long upperBoundary = context.getNextFireTime() == null ? endTime :
context.getNextFireTime().getTime();
+
+ OfflineJobSubmitRequest request = new OfflineJobSubmitRequest();
+ request.setGroupId(inlongGroupId);
+ request.setBoundaryType(BoundaryType.TIME.getType());
+ request.setLowerBoundary(String.valueOf(lowerBoundary));
+ request.setUpperBoundary(String.valueOf(upperBoundary));
+ LOGGER.info("Starting submit offline job for group: {}, with lower
boundary: {} and upper boundary: {}",
+ inlongGroupId, lowerBoundary, upperBoundary);
+
+ try {
+ if (groupClient.submitOfflineJob(request)) {
+ LOGGER.info("Successfully submitting offline job for group
{}", inlongGroupId);
+ } else {
+ LOGGER.warn("Failed to submit offline job for group {}",
inlongGroupId);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Exception to submit offline job for group {}, error
msg: {}", inlongGroupId, e.getMessage());
}
+
}
private void initGroupClientIfNeeded(JobDataMap jobDataMap) {
if (groupClient == null) {
String host = (String) jobDataMap.get(MANAGER_HOST);
int port = (int) jobDataMap.get(MANAGER_PORT);
- String secreteId = (String) jobDataMap.get(SECRETE_ID);
- String secreteKey = (String) jobDataMap.get(SECRETE_KEY);
- LOGGER.info("Initializing Inlong group client, with host: {},
port: {}, userName : {}",
- host, port, secreteId);
+ String username = (String) jobDataMap.get(USERNAME);
+ String password = (String) jobDataMap.get(PASSWORD);
+ endTime = (long) jobDataMap.get(END_TIME);
+ LOGGER.info("Initializing Inlong group client, with host: {},
port: {}, userName : {}, endTime: {}",
+ host, port, username, endTime);
ClientConfiguration configuration = new ClientConfiguration();
- configuration.setAuthentication(new
DefaultAuthentication(secreteId, secreteKey));
+ configuration.setAuthentication(new
DefaultAuthentication(username, password));
String serviceUrl = host + ":" + port;
InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl,
configuration);
ClientFactory clientFactory =
ClientUtils.getClientFactory(inlongClient.getConfiguration());
diff --git
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
index 50c5d9505f..4a974f299e 100644
---
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
+++
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/util/ScheduleUtils.java
@@ -48,17 +48,20 @@ public class ScheduleUtils {
public static final String MANAGER_HOST = "HOST";
public static final String MANAGER_PORT = "PORT";
- public static final String SECRETE_ID = "SECRETE_ID";
- public static final String SECRETE_KEY = "SECRETE_KEY";
+ public static final String USERNAME = "USERNAME";
+ public static final String PASSWORD = "PASSWORD";
+ public static final String END_TIME = "END_TIME";
public static JobDetail genQuartzJobDetail(ScheduleInfo scheduleInfo,
Class<? extends Job> clz,
- String host, Integer port, String secreteId, String secreteKey) {
+ String host, Integer port, String username, String password) {
+
return JobBuilder.newJob(clz)
.withIdentity(scheduleInfo.getInlongGroupId())
.usingJobData(MANAGER_HOST, host)
.usingJobData(MANAGER_PORT, port)
- .usingJobData(SECRETE_ID, secreteId)
- .usingJobData(SECRETE_KEY, secreteKey)
+ .usingJobData(USERNAME, username)
+ .usingJobData(PASSWORD, password)
+ .usingJobData(END_TIME, scheduleInfo.getEndTime().getTime())
.build();
}
diff --git
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
index 1f0329a318..af54fbcc55 100644
---
a/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
+++
b/inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/BaseScheduleTest.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.manager.schedule.exception.QuartzScheduleException;
import java.sql.Timestamp;
+import static org.apache.inlong.manager.schedule.ScheduleUnit.ONE_ROUND;
import static org.apache.inlong.manager.schedule.ScheduleUnit.SECOND;
public class BaseScheduleTest {
@@ -42,6 +43,10 @@ public class BaseScheduleTest {
return genNormalScheduleInfo(GROUP_ID, SECOND.getUnit(),
DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
}
+ public ScheduleInfo genOneroundScheduleInfo() {
+ return genNormalScheduleInfo(GROUP_ID, ONE_ROUND.getUnit(),
DEFAULT_INTERVAL, DEFAULT_SPAN_IN_MS);
+ }
+
public ScheduleInfo genNormalScheduleInfo(String groupId, String
scheduleUnit, int scheduleInterval,
long timeSpanInMs) {
ScheduleInfo scheduleInfo = new ScheduleInfo();
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index c6643c3259..f8a943c7b4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -29,6 +29,7 @@ import
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import org.apache.inlong.manager.pojo.user.UserInfo;
import javax.validation.Valid;
@@ -219,10 +220,10 @@ public interface InlongGroupService {
List<GroupFullInfo> getGroupByBackUpClusterTag(String clusterTag);
/**
- * Submitting offline job for the given group.
- * @param groupId the inlong group to submit offline job
+ * Submitting offline job.
+ * @param request request to submit offline sync job
*
* */
- Boolean submitOfflineJob(String groupId);
+ Boolean submitOfflineJob(OfflineJobSubmitRequest request);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index d2f3c0c948..7ea4d001ee 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.group;
+import org.apache.inlong.common.bounded.Boundaries;
+import org.apache.inlong.common.bounded.BoundaryType;
import org.apache.inlong.manager.common.auth.Authentication.AuthType;
import org.apache.inlong.manager.common.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.common.consts.InlongConstants;
@@ -59,6 +61,7 @@ import
org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -78,6 +81,7 @@ import
org.apache.inlong.manager.service.schedule.ScheduleOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.StreamSourceOperator;
+import org.apache.inlong.manager.service.source.bounded.BoundedSourceType;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.tenant.InlongTenantService;
import org.apache.inlong.manager.service.user.InlongRoleService;
@@ -932,8 +936,9 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
}
@Override
- public Boolean submitOfflineJob(String groupId) {
+ public Boolean submitOfflineJob(OfflineJobSubmitRequest request) {
// 1. get stream info list
+ String groupId = request.getGroupId();
InlongGroupInfo groupInfo = get(groupId);
if (groupInfo == null) {
String msg = String.format("InLong group not found for group=%s",
groupId);
@@ -946,7 +951,40 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
LOGGER.warn("No stream info found for group {}, skip submit
offline job", groupId);
return false;
}
- return scheduleOperator.submitOfflineJob(groupId, streamInfoList);
+
+ // check if source type is bounded source
+ streamInfoList.forEach(this::checkBoundedSource);
+
+ // get the source boundaries
+ checkSourceBoundaryType(request.getBoundaryType());
+ BoundaryType boundaryType =
BoundaryType.getInstance(request.getBoundaryType());
+ if (boundaryType == null) {
+ throw new
BusinessException(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED,
+
String.format(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED.getMessage(),
request.getBoundaryType()));
+ }
+ Boundaries boundaries = new Boundaries(request.getLowerBoundary(),
request.getUpperBoundary(), boundaryType);
+
+ LOGGER.info("Check bounded source success, start to submitting offline
job for group {}", groupId);
+
+ return scheduleOperator.submitOfflineJob(groupId, streamInfoList,
boundaries);
+ }
+
+ private void checkBoundedSource(InlongStreamInfo streamInfo) {
+ streamInfo.getSourceList().forEach(stream -> {
+ if (!BoundedSourceType.isBoundedSource(stream.getSourceType())) {
+ throw new
BusinessException(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED,
+
String.format(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED.getMessage(),
+ stream.getSourceType()));
+ }
+ });
+ }
+
+ private void checkSourceBoundaryType(String sourceBoundaryType) {
+ if (!BoundaryType.isSupportBoundaryType(sourceBoundaryType)) {
+ throw new
BusinessException(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED,
+
String.format(ErrorCodeEnum.BOUNDARY_TYPE_NOT_SUPPORTED.getMessage(),
+ sourceBoundaryType));
+ }
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
index 653523294e..5b1e3c2df6 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperator.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.schedule;
+import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfo;
import org.apache.inlong.manager.pojo.schedule.ScheduleInfoRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
@@ -100,7 +101,8 @@ public interface ScheduleOperator {
* Start offline sync job when the schedule instance callback.
* @param groupId groupId to start offline job
* @param streamInfoList stream list to start offline job
+ * @param boundaries source boundaries for bounded source
* @Return whether succeed
* */
- Boolean submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList);
+ Boolean submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList, Boundaries boundaries);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
index 268109e1a2..61f847f244 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/schedule/ScheduleOperatorImpl.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.schedule;
+import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -178,12 +179,12 @@ public class ScheduleOperatorImpl implements
ScheduleOperator {
}
@Override
- public Boolean submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList) {
+ public Boolean submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList, Boundaries boundaries) {
if (offlineJobOperator == null) {
offlineJobOperator =
OfflineJobOperatorFactory.getOfflineJobOperator();
}
try {
- offlineJobOperator.submitOfflineJob(groupId, streamInfoList);
+ offlineJobOperator.submitOfflineJob(groupId, streamInfoList,
boundaries);
LOGGER.info("Submit offline job for group {} and stream list {}
success.", groupId,
streamInfoList.stream().map(InlongStreamInfo::getName).collect(Collectors.toList()));
} catch (Exception e) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java
new file mode 100644
index 0000000000..df61fd2d51
--- /dev/null
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/bounded/BoundedSourceType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.bounded;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import lombok.Getter;
+
+@Getter
+public enum BoundedSourceType {
+
+ PULSAR("pulsar");
+
+ private final String sourceType;
+
+ BoundedSourceType(String name) {
+ this.sourceType = name;
+ }
+
+ public static BoundedSourceType getInstance(String name) {
+ for (BoundedSourceType source : values()) {
+ if (source.getSourceType().equalsIgnoreCase(name)) {
+ return source;
+ }
+ }
+ throw new
BusinessException(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED,
+
String.format(ErrorCodeEnum.BOUNDED_SOURCE_TYPE_NOT_SUPPORTED.getMessage(),
name));
+ }
+
+ public static boolean isBoundedSource(String name) {
+ for (BoundedSourceType source : values()) {
+ if (source.getSourceType().equalsIgnoreCase(name)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 3a9389c297..caa4aa6c83 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -33,6 +33,7 @@ import
org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest;
+import org.apache.inlong.manager.pojo.schedule.OfflineJobSubmitRequest;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.service.group.InlongGroupProcessService;
@@ -43,6 +44,8 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
@@ -65,6 +68,7 @@ import java.util.Map;
@Api(tags = "Inlong-Group-API")
public class InlongGroupController {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InlongGroupController.class);
@Autowired
private InlongGroupService groupService;
@Autowired
@@ -252,10 +256,10 @@ public class InlongGroupController {
return Response.success(groupService.finishTagSwitch(groupId));
}
- @RequestMapping(value = "/group/submitOfflineJob/{groupId}", method =
RequestMethod.POST)
+ @RequestMapping(value = "/group/submitOfflineJob", method =
RequestMethod.POST)
@ApiOperation(value = "Submitting inlong offline job process")
- @ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class)
- public Response<Boolean> submitOfflineJob(@PathVariable String groupId) {
- return Response.success(groupService.submitOfflineJob(groupId));
+ public Response<Boolean> submitOfflineJob(@RequestBody
OfflineJobSubmitRequest request) {
+ LOGGER.info("Received offline job submit request {}", request);
+ return Response.success(groupService.submitOfflineJob(request));
}
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
index f6d395dc7e..d6bd3bdf38 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/OfflineJobOperator.java
@@ -17,11 +17,13 @@
package org.apache.inlong.manager.workflow.processor;
+import org.apache.inlong.common.bounded.Boundaries;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import java.util.List;
public interface OfflineJobOperator {
- void submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList) throws Exception;
+ void submitOfflineJob(String groupId, List<InlongStreamInfo>
streamInfoList, Boundaries boundaries)
+ throws Exception;
}