This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/dev-offline-sync by this push:
new 9e96c4e37b [INLONG-9781][Manager] Add offline sync task type
definition (#9787)
9e96c4e37b is described below
commit 9e96c4e37bcc2f4c20c47879e0a6b2cd8f8b2e75
Author: AloysZhang <[email protected]>
AuthorDate: Thu Mar 7 11:44:20 2024 +0800
[INLONG-9781][Manager] Add offline sync task type definition (#9787)
---
.../java/org/apache/inlong/manager/client/ut/BaseTest.java | 2 +-
.../inlong/manager/common/consts/InlongConstants.java | 3 ++-
.../org/apache/inlong/manager/common/enums/GroupMode.java | 14 +++++++++++---
.../manager/plugin/listener/StartupSortListener.java | 2 +-
.../apache/inlong/manager/pojo/group/InlongGroupInfo.java | 3 ++-
.../inlong/manager/pojo/group/InlongGroupPageRequest.java | 3 ++-
.../inlong/manager/pojo/group/InlongGroupRequest.java | 5 +++--
.../inlong/manager/service/core/impl/AuditServiceImpl.java | 4 ++--
.../manager/service/group/InlongGroupServiceImpl.java | 2 +-
.../service/listener/group/InitGroupCompleteListener.java | 2 +-
.../listener/group/UpdateGroupCompleteListener.java | 2 +-
.../service/listener/queue/QueueResourceListener.java | 2 +-
.../listener/stream/InitStreamCompleteListener.java | 2 +-
.../manager/service/source/AbstractSourceOperator.java | 2 +-
.../manager/service/source/StreamSourceServiceImpl.java | 2 +-
15 files changed, 31 insertions(+), 19 deletions(-)
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
index 4d218918f5..9961c16ef8 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -110,7 +110,7 @@ public class BaseTest {
// set enable zk, create resource, group mode, and cluster tag
pulsarInfo.setEnableZookeeper(InlongConstants.DISABLE_ZK);
pulsarInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
- pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_MODE);
+ pulsarInfo.setInlongGroupMode(InlongConstants.DATASYNC_REALTIME_MODE);
pulsarInfo.setInlongClusterTag("default_cluster");
pulsarInfo.setDailyRecords(10000000);
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c3085972fd..581ebb3098 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -87,7 +87,8 @@ public class InlongConstants {
public static final Integer DELETED_STATUS = 10;
public static final Integer STANDARD_MODE = 0;
- public static final Integer DATASYNC_MODE = 1;
+ public static final Integer DATASYNC_REALTIME_MODE = 1;
+ public static final Integer DATASYNC_OFFLINE_MODE = 2;
public static final Integer DISABLE_ZK = 0;
public static final Integer ENABLE_ZK = 1;
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
index f0db2353b6..d4b417f39a 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
@@ -31,10 +31,18 @@ public enum GroupMode {
STANDARD("standard"),
/**
- * DataSync mode(only Data Synchronization): group init only with sort in
InLong Cluster
- * StreamSource -> Sort -> StreamSink
+ * DataSync mode(only Data Synchronization): real-time data sync in stream
way, group init only with
+ * sort in InLong Cluster.
+ * StreamSource -> Sort -> Sink
*/
- DATASYNC("datasync");
+ DATASYNC("datasync"),
+
+ /**
+ * DataSync mode(only Data Synchronization): offline data sync in batch
way, group init only with sort
+ * in InLong Cluster.
+ * BatchSource -> Sort -> Sink
+ */
+ DATASYNC_BATCH("datasync_offline");
@Getter
private final String mode;
diff --git
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 0b0e55e369..ab9e9b55d8 100644
---
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -73,7 +73,7 @@ public class StartupSortListener implements
SortOperateListener {
}
log.info("add startup group listener for groupId [{}]", groupId);
- return
InlongConstants.DATASYNC_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
+ return
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupProcessForm.getGroupInfo().getInlongGroupMode());
}
@Override
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
index 7c272d0911..272583778e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupInfo.java
@@ -72,7 +72,8 @@ public abstract class InlongGroupInfo extends BaseInlongGroup
{
@ApiModelProperty(value = "Whether to enable create resource? 0: disable,
1: enable")
private Integer enableCreateResource;
- @ApiModelProperty(value = "Standard mode(include Data Ingestion and
Synchronization): 0, DataSync mode(only Data Synchronization): 1")
+ @ApiModelProperty(value = "Standard mode(include Data Ingestion and
Synchronization): 0, DataSync mode(only Data Synchronization, real-time data
sync in stream way): 1,"
+ + " DataSync mode(only Data Synchronization, offline data sync in
batch way): 2")
private Integer inlongGroupMode;
@ApiModelProperty(value = "Data report type, default is 0.\n"
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java
index 5d7c24b295..4dc2140166 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupPageRequest.java
@@ -58,7 +58,8 @@ public class InlongGroupPageRequest extends PageRequest {
@ApiModelProperty(value = "The inlong cluster tag list")
private List<String> clusterTagList;
- @ApiModelProperty(value = "Standard mode(include Data Ingestion and
Synchronization): 0, DataSync mode(only Data Synchronization): 1")
+ @ApiModelProperty(value = "Standard mode(include Data Ingestion and
Synchronization): 0, DataSync mode(only Data Synchronization, real-time data
sync in stream way): 1,"
+ + " DataSync mode(only Data Synchronization, offline data sync in
batch way): 2")
private Integer inlongGroupMode;
@ApiModelProperty(value = "Current user", hidden = true)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index 30893b7a09..6140bddad5 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -86,8 +86,9 @@ public abstract class InlongGroupRequest extends
BaseInlongGroup {
@Range(min = 0, max = 1, message = "default is 1, only supports [0:
disable, 1: enable]")
private Integer enableCreateResource;
- @ApiModelProperty(value = "Standard mode(include Data Ingestion and
Synchronization): 0, DataSync mode(only Data Synchronization): 1")
- @Range(min = 0, max = 1, message = "default is 0, only supports [0:
Standard, 1: DataSync]")
+ @ApiModelProperty(value = "Standard mode(include Data Ingestion and
Synchronization): 0, DataSync mode(only Data Synchronization, real-time data
sync in stream way): 1,"
+ + " DataSync mode(only Data Synchronization, offline data sync in
batch way): 2")
+ @Range(min = 0, max = 2, message = "default is 0, only supports [0:
Standard, 1: DataSync, 2: DataSyncOffline]")
private Integer inlongGroupMode;
@ApiModelProperty(value = "Data report type, default is 0.\n"
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index a2163ce9ac..1df2d014ba 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -276,7 +276,7 @@ public class AuditServiceImpl implements AuditService {
if (CollectionUtils.isEmpty(request.getAuditIds())) {
// properly overwrite audit ids by role and stream config
- if
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
+ if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode()))
{
auditIdMap.put(getAuditId(sourceNodeType, false),
sourceNodeType);
request.setAuditIds(getAuditIds(groupId, streamId,
sourceNodeType, sinkNodeType));
} else {
@@ -436,7 +436,7 @@ public class AuditServiceImpl implements AuditService {
} else {
auditSet.add(getAuditId(sinkNodeType, true));
InlongGroupEntity inlongGroup =
inlongGroupMapper.selectByGroupId(groupId);
- if
(InlongConstants.DATASYNC_MODE.equals(inlongGroup.getInlongGroupMode())) {
+ if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode()))
{
auditSet.add(getAuditId(sourceNodeType, false));
} else {
auditSet.add(getAuditId(sinkNodeType, false));
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 2d87b74995..6dfc2259d3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -706,7 +706,7 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
InlongGroupInfo groupInfo = this.get(groupId);
// check if the group mode is data sync mode
- if
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+ if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()))
{
String errMSg = String.format("no need to switch sync mode group =
{}", groupId);
LOGGER.error(errMSg);
throw new BusinessException(errMSg);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index db8cffbbe1..c96c12c9ff 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -98,7 +98,7 @@ public class InitGroupCompleteListener implements
ProcessEventListener {
// update status of other related configs
if
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource()))
{
- if
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+ if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()))
{
sourceService.updateStatus(groupId, null,
SourceStatus.SOURCE_NORMAL.getCode(), operator);
} else {
sourceService.updateStatus(groupId, null,
SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
index d21ca83c92..55290b948f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java
@@ -82,7 +82,7 @@ public class UpdateGroupCompleteListener implements
ProcessEventListener {
}
// if the inlong group is dataSync mode, the stream source needs to be
processed.
- if
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+ if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()))
{
changeSource4DataSync(groupId, operateType, operator);
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
index 79eb6638cc..f566c29728 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java
@@ -115,7 +115,7 @@ public class QueueResourceListener implements
QueueOperateListener {
String operator = context.getOperator();
GroupOperateType operateType = groupProcessForm.getGroupOperateType();
- if
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+ if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()))
{
log.warn("skip to execute QueueResourceListener as sync mode for
groupId={}", groupId);
if (GroupOperateType.INIT.equals(operateType)) {
this.createQueueForStreams(groupInfo,
groupProcessForm.getStreamInfos(), operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index a739c5da02..df41823b84 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -66,7 +66,7 @@ public class InitStreamCompleteListener implements
ProcessEventListener {
// Update status of other related configs
streamService.updateStatus(groupId, streamId,
StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
streamService.updateWithoutCheck(streamInfo.genRequest(),
operator);
- if
(InlongConstants.DATASYNC_MODE.equals(form.getGroupInfo().getInlongGroupMode()))
{
+ if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(form.getGroupInfo().getInlongGroupMode()))
{
sourceService.updateStatus(groupId, streamId,
SourceStatus.SOURCE_NORMAL.getCode(), operator);
} else {
sourceService.updateStatus(groupId, streamId,
SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 00f85052fd..0fd1800ea4 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -137,7 +137,7 @@ public abstract class AbstractSourceOperator implements
StreamSourceOperator {
updateFieldOpt(entity, request.getFieldList());
return;
}
- boolean allowUpdate = InlongConstants.DATASYNC_MODE.equals(groupMode)
+ boolean allowUpdate =
InlongConstants.DATASYNC_REALTIME_MODE.equals(groupMode)
|| SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus());
if (!allowUpdate) {
throw new BusinessException(ErrorCodeEnum.SOURCE_OPT_NOT_ALLOWED,
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index a8a01224b8..08015a4086 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -222,7 +222,7 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
// if the group mode is DATASYNC, just get all related stream sources
List<StreamSource> streamSources = this.listSource(groupId, null);
- if
(InlongConstants.DATASYNC_MODE.equals(groupInfo.getInlongGroupMode())) {
+ if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupInfo.getInlongGroupMode()))
{
result = streamSources.stream()
.collect(Collectors.groupingBy(StreamSource::getInlongStreamId, HashMap::new,
Collectors.toCollection(ArrayList::new)));