This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 916bef2c2 [INLONG-5148][Manager] Handle the lightweight and standard
inlong groups differently in workflow (#5153)
916bef2c2 is described below
commit 916bef2c28178d42792d9f8fe9da7a2d48108394
Author: healchow <[email protected]>
AuthorDate: Thu Jul 21 16:17:08 2022 +0800
[INLONG-5148][Manager] Handle the lightweight and standard inlong groups
differently in workflow (#5153)
---
.../inlong/manager/client/ut/Kafka2HiveTest.java | 62 +++++++++----------
.../inlong/manager/common/enums}/ProcessName.java | 22 +------
.../process/LightGroupResourceProcessForm.java | 71 ----------------------
.../operation/ConsumptionProcessOperation.java | 2 +-
.../operation/InlongGroupProcessOperation.java | 2 +-
.../operation/InlongStreamProcessOperation.java | 2 +-
.../service/resource/SinkResourceListener.java | 10 ++-
.../resource/StreamSinkResourceListener.java | 11 ++--
.../service/sort/util/FilterFunctionUtils.java | 8 +--
.../service/workflow/WorkflowDefinition.java | 1 +
.../service/workflow/WorkflowOperation.java | 1 +
.../manager/service/workflow/WorkflowService.java | 1 +
.../service/workflow/WorkflowServiceImpl.java | 1 +
.../ApplyConsumptionWorkflowDefinition.java | 2 +-
.../group/ApplyGroupWorkflowDefinition.java | 2 +-
.../group/CreateGroupWorkflowDefinition.java | 2 +-
.../group/DeleteGroupWorkflowDefinition.java | 32 +++++-----
.../group/RestartGroupWorkflowDefinition.java | 2 +-
.../group/SuspendGroupWorkflowDefinition.java | 2 +-
.../group/listener/InitGroupCompleteListener.java | 2 +-
.../listener/UpdateGroupCompleteListener.java | 34 +++++++++--
.../apply/ApproveApplyProcessListener.java | 2 +-
.../stream/CreateStreamWorkflowDefinition.java | 2 +-
.../stream/DeleteStreamWorkflowDefinition.java | 2 +-
.../stream/RestartStreamWorkflowDefinition.java | 2 +-
.../stream/SuspendStreamWorkflowDefinition.java | 2 +-
.../listener/UpdateStreamCompleteListener.java | 2 +-
.../manager/service/sort/DisableZkForSortTest.java | 2 +-
.../source/listener/StreamSourceListenerTest.java | 2 +-
.../service/workflow/WorkflowServiceImplTest.java | 1 +
.../workflow/processor/UserTaskProcessor.java | 2 +-
31 files changed, 110 insertions(+), 181 deletions(-)
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
index 20a12a072..6effd908f 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
@@ -25,6 +25,7 @@ import
org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.TaskStatus;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
@@ -336,41 +337,40 @@ class Kafka2HiveTest extends BaseTest {
)
);
+ EventLogView eventLogView1 = EventLogView.builder()
+ .id(38)
+ .processId(12)
+ .processName(ProcessName.CREATE_GROUP_RESOURCE.toString())
+
.processDisplayName(ProcessName.CREATE_GROUP_RESOURCE.getDisplayName())
+ .inlongGroupId(GROUP_ID)
+ .taskId(12)
+ .elementName("InitSort")
+ .elementDisplayName("Group-InitSort")
+ .eventType("ProcessEvent")
+ .event("FAIL")
+ .listener("InitGroupFailedListener")
+ .status(-1)
+ .ip("127.0.0.1")
+ .build();
+ EventLogView eventLogView2 = EventLogView.builder()
+ .id(39)
+ .processId(12)
+ .processName(ProcessName.CREATE_GROUP_RESOURCE.toString())
+
.processDisplayName(ProcessName.CREATE_GROUP_RESOURCE.getDisplayName())
+ .inlongGroupId(GROUP_ID)
+ .taskId(12)
+ .elementName("InitSort")
+ .elementDisplayName("Group-InitSort")
+ .eventType("TaskEvent")
+ .event("COMPLETE")
+ .listener("InitGroupListener")
+ .ip("127.0.0.1")
+ .build();
stubFor(
get(urlMatching(MANAGER_URL_PREFIX + "/workflow/event/list.*"))
.willReturn(
okJson(JsonUtils.toJsonString(Response.success(new PageInfo<>(
- Lists.newArrayList(
- EventLogView.builder()
- .id(39)
- .processId(12)
-
.processName("CREATE_LIGHT_GROUP_PROCESS")
-
.processDisplayName("Create-Light-Group")
-
.inlongGroupId(GROUP_ID)
- .taskId(12)
-
.elementName("initSort")
-
.elementDisplayName("Group-InitSort")
-
.eventType("ProcessEvent")
- .event("FAIL")
-
.listener("LightGroupFailedListener")
- .status(-1)
- .ip("127.0.0.1")
- .build(),
- EventLogView.builder()
- .id(38)
- .processId(12)
-
.processName("CREATE_LIGHT_GROUP_PROCESS")
-
.processDisplayName("Create-Light-Group")
-
.inlongGroupId(GROUP_ID)
- .taskId(12)
-
.elementName("initSort")
-
.elementDisplayName("Group-InitSort")
- .eventType("TaskEvent")
- .event("COMPLETE")
-
.listener("LightGroupSortListener")
- .ip("127.0.0.1")
- .build()
- )
+ Lists.newArrayList(eventLogView1,
eventLogView2)
))))
)
);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
similarity index 79%
rename from
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
rename to
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
index 176c6ae0c..bf604f2ef 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow;
+package org.apache.inlong.manager.common.enums;
/**
* WorkflowProcess name
@@ -47,26 +47,6 @@ public enum ProcessName {
*/
DELETE_GROUP_PROCESS("Delete-Group"),
- /**
- * Startup lightweight inlong group process
- */
- CREATE_LIGHT_GROUP_PROCESS("Create-Light-Group"),
-
- /**
- * Suspend lightweight inlong group process
- */
- SUSPEND_LIGHT_GROUP_PROCESS("Suspend-Light-Group"),
-
- /**
- * Restart lightweight inlong group process
- */
- RESTART_LIGHT_GROUP_PROCESS("Restart-Light-Group"),
-
- /**
- * Delete lightweight inlong group process
- */
- DELETE_LIGHT_GROUP_PROCESS("Delete-Light-Group"),
-
/**
* Apply consumption process
*/
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
deleted file mode 100644
index 0aef09b27..000000000
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.workflow.form.process;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.exceptions.FormValidateException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.util.Preconditions;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Form of create lightweight inlong group resource
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-public class LightGroupResourceProcessForm extends BaseProcessForm {
-
- public static final String FORM_NAME = "LightGroupResourceProcessForm";
-
- private InlongGroupInfo groupInfo;
-
- private List<InlongStreamInfo> streamInfos;
-
- private GroupOperateType groupOperateType = GroupOperateType.INIT;
-
- @Override
- public void validate() throws FormValidateException {
- Preconditions.checkNotNull(groupInfo,
ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
- }
-
- @Override
- public String getFormName() {
- return FORM_NAME;
- }
-
- @Override
- public String getInlongGroupId() {
- return groupInfo.getInlongGroupId();
- }
-
- @Override
- public Map<String, Object> showInList() {
- Map<String, Object> show = new HashMap<>();
- show.put("inlongGroupId", groupInfo.getInlongGroupId());
- show.put("groupOperateType", this.groupOperateType);
- return show;
- }
-
-}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
index 695618bfb..a51c157da 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
index 88350aec3..fa61f81fd 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.apache.inlong.manager.workflow.core.WorkflowQueryService;
import org.slf4j.Logger;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
index 302e68fb7..58f2ccea3 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongStreamProcessOperation.java
@@ -31,7 +31,7 @@ import
org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
index 8f9f4fffd..3b18acb37 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/SinkResourceListener.java
@@ -71,9 +71,8 @@ public class SinkResourceListener implements
SinkOperateListener {
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(needCreateList)) {
- String result = "sink resources have been created for group [" +
groupId + "] and stream " + streamIdList;
- log.info(result);
- return ListenerResult.success(result);
+ log.info("all sink resources have been created for group [" +
groupId + "] and stream " + streamIdList);
+ return ListenerResult.success();
}
for (SinkInfo sinkConfig : needCreateList) {
@@ -81,9 +80,8 @@ public class SinkResourceListener implements
SinkOperateListener {
SinkResourceOperator resourceOperator =
resourceOperatorFactory.getInstance(SinkType.forType(sinkType));
resourceOperator.createSinkResource(sinkConfig);
}
- String result = "success to create sink resources for group [" +
groupId + "] and stream " + streamIdList;
- log.info(result);
- return ListenerResult.success(result);
+ log.info("success to create sink resources for group [" + groupId + "]
and stream " + streamIdList);
+ return ListenerResult.success();
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
index 44ce6bdcf..242c3bf2a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/StreamSinkResourceListener.java
@@ -68,10 +68,8 @@ public class StreamSinkResourceListener implements
SinkOperateListener {
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(needCreateResources)) {
- String result =
- "sink resources have been created for group [" + groupId +
"] and stream [" + streamId + "]";
- log.info(result);
- return ListenerResult.success(result);
+ log.info("sink resources have been created for group [" + groupId
+ "] and stream [" + streamId + "]");
+ return ListenerResult.success();
}
for (SinkInfo sinkInfo : needCreateResources) {
@@ -79,9 +77,8 @@ public class StreamSinkResourceListener implements
SinkOperateListener {
SinkResourceOperator resourceOperator =
resourceOperatorFactory.getInstance(SinkType.forType(sinkType));
resourceOperator.createSinkResource(sinkInfo);
}
- String result = "success to create sink resources for group [" +
groupId + "] and stream [" + streamId + "]";
- log.info(result);
- return ListenerResult.success(result);
+ log.info("success to create sink resources for group [" + groupId + "]
and stream [" + streamId + "]");
+ return ListenerResult.success();
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
index 53c5dbfee..6b86ff2a1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/FilterFunctionUtils.java
@@ -76,8 +76,7 @@ public class FilterFunctionUtils {
case STRING_REPLACER:
return Lists.newArrayList();
default:
- throw new UnsupportedOperationException(
- String.format("Unsupported transformType=%s",
transformType));
+ throw new
UnsupportedOperationException(String.format("Unsupported transformType=%s",
transformType));
}
}
@@ -104,7 +103,7 @@ public class FilterFunctionUtils {
/**
* Parse filter strategy from TransformResponse and convert to the filter
strategy of sort protocol
*
- * @param transformResponse The transform response that may contains
filter operation
+ * @param transformResponse The transform response that may contain filter
operation
* @return The filter strategy, see {@link FilterStrategy}
*/
public static FilterStrategy parseFilterStrategy(TransformResponse
transformResponse) {
@@ -200,8 +199,7 @@ public class FilterFunctionUtils {
case not_null:
return IsNotNullOperator.getInstance();
default:
- throw new IllegalArgumentException(
- String.format("Unsupported operateType=%s for inlong",
operationType));
+ throw new IllegalArgumentException(String.format("Unsupported
operateType=%s", operationType));
}
}
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinition.java
index 3fd8611dd..6dd0a12c2 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowDefinition.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.workflow;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowOperation.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowOperation.java
index b9503a1ac..f9f68e615 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowOperation.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowOperation.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import org.apache.inlong.manager.common.enums.ProcessName;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.BaseProcessForm;
/**
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowService.java
index fa02ea7c4..b74a55a8c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowService.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.workflow;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.pojo.workflow.ProcessDetailResponse;
import org.apache.inlong.manager.common.pojo.workflow.ProcessQuery;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index f0ae45fd0..935af5ed8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -22,6 +22,7 @@ import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Maps;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.TaskStatus;
import org.apache.inlong.manager.common.pojo.workflow.EventLogQuery;
import org.apache.inlong.manager.common.pojo.workflow.ProcessCountQuery;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
index 9e64f2886..60a513302 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
@@ -23,7 +23,7 @@ import
org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumpt
import
org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionCancelProcessListener;
import
org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionCompleteProcessListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
index fc5990d04..bf97c08a9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
@@ -21,7 +21,7 @@ import
org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterCont
import
org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import
org.apache.inlong.manager.common.pojo.workflow.form.task.InlongGroupApproveForm;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.group.listener.apply.AfterApprovedTaskListener;
import
org.apache.inlong.manager.service.workflow.group.listener.apply.ApproveApplyProcessListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index 862c1f99e..421bca1c0 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.group.listener.InitGroupCompleteListener;
import
org.apache.inlong.manager.service.workflow.group.listener.InitGroupFailedListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 8f813f6f0..0bb526bde 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
import
org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
@@ -70,20 +70,20 @@ public class DeleteGroupWorkflowDefinition implements
WorkflowDefinition {
process.setStartEvent(startEvent);
// Delete Source
- ServiceTask deleteDataSourceTask = new ServiceTask();
- deleteDataSourceTask.setName("DeleteSource");
- deleteDataSourceTask.setDisplayName("Group-DeleteSource");
- deleteDataSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
- deleteDataSourceTask.addListenerProvider(groupTaskListenerFactory);
- process.addTask(deleteDataSourceTask);
+ ServiceTask deleteSourceTask = new ServiceTask();
+ deleteSourceTask.setName("DeleteSource");
+ deleteSourceTask.setDisplayName("Group-DeleteSource");
+ deleteSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
+ deleteSourceTask.addListenerProvider(groupTaskListenerFactory);
+ process.addTask(deleteSourceTask);
// Delete MQ
- ServiceTask deleteMqTask = new ServiceTask();
- deleteMqTask.setName("DeleteMQ");
- deleteMqTask.setDisplayName("Group-DeleteMQ");
- deleteMqTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
- deleteMqTask.addListenerProvider(groupTaskListenerFactory);
- process.addTask(deleteMqTask);
+ ServiceTask deleteMQTask = new ServiceTask();
+ deleteMQTask.setName("DeleteMQ");
+ deleteMQTask.setDisplayName("Group-DeleteMQ");
+ deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
+ deleteMQTask.addListenerProvider(groupTaskListenerFactory);
+ process.addTask(deleteMQTask);
// Delete Sort
ServiceTask deleteSortTask = new ServiceTask();
@@ -97,9 +97,9 @@ public class DeleteGroupWorkflowDefinition implements
WorkflowDefinition {
EndEvent endEvent = new EndEvent();
process.setEndEvent(endEvent);
- startEvent.addNext(deleteDataSourceTask);
- deleteDataSourceTask.addNext(deleteMqTask);
- deleteMqTask.addNext(deleteSortTask);
+ startEvent.addNext(deleteSourceTask);
+ deleteSourceTask.addNext(deleteMQTask);
+ deleteMQTask.addNext(deleteSortTask);
deleteSortTask.addNext(endEvent);
return process;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
index ed9f22413..a8d1afb27 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
import
org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
index aabba5bf3..e59819b3f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
import
org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
index 8154f2f7e..aed5385d9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
@@ -66,9 +66,9 @@ public class InitGroupCompleteListener implements
ProcessEventListener {
String groupId = form.getInlongGroupId();
log.info("begin to execute InitGroupCompleteListener for groupId={}",
groupId);
+ // update inlong group status and other info
InlongGroupInfo groupInfo = form.getGroupInfo();
String operator = context.getOperator();
- // update inlong group status and other info
groupService.updateStatus(groupId,
GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator);
groupService.update(groupInfo.genRequest(), operator);
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
index 6df9863ff..d1c5aafa8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
@@ -18,9 +18,11 @@
package org.apache.inlong.manager.service.workflow.group.listener;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.source.StreamSourceService;
@@ -49,35 +51,55 @@ public class UpdateGroupCompleteListener implements
ProcessEventListener {
}
@Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
+ public ListenerResult listen(WorkflowContext context) {
GroupResourceProcessForm form = (GroupResourceProcessForm)
context.getProcessForm();
String groupId = form.getInlongGroupId();
GroupOperateType operateType = form.getGroupOperateType();
log.info("begin to execute UpdateGroupCompleteListener for groupId={},
operateType={}", groupId, operateType);
+ // update inlong group status and other configs
String operator = context.getOperator();
switch (operateType) {
case SUSPEND:
groupService.updateStatus(groupId,
GroupStatus.SUSPENDED.getCode(), operator);
- sourceService.updateStatus(groupId, null,
SourceStatus.SOURCE_FROZEN.getCode(), operator);
break;
case RESTART:
groupService.updateStatus(groupId,
GroupStatus.RESTARTED.getCode(), operator);
- sourceService.updateStatus(groupId, null,
SourceStatus.SOURCE_NORMAL.getCode(), operator);
break;
case DELETE:
groupService.updateStatus(groupId,
GroupStatus.DELETED.getCode(), operator);
- sourceService.logicDeleteAll(groupId, null, operator);
break;
default:
+ log.warn("Unsupported operate={} for inlong group",
operateType);
break;
}
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ groupService.update(groupInfo.genRequest(), operator);
- // update inlong group status and other configs
- groupService.update(form.getGroupInfo().genRequest(), operator);
+ // if the inlong group is lightweight mode, the stream source needs to
be processed.
+ if
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+ changeSource4Lightweight(groupId, operateType, operator);
+ }
log.info("success to execute UpdateGroupCompleteListener for
groupId={}, operateType={}", groupId, operateType);
return ListenerResult.success();
}
+ private void changeSource4Lightweight(String groupId, GroupOperateType
operateType, String operator) {
+ switch (operateType) {
+ case SUSPEND:
+ sourceService.updateStatus(groupId, null,
SourceStatus.SOURCE_FROZEN.getCode(), operator);
+ break;
+ case RESTART:
+ sourceService.updateStatus(groupId, null,
SourceStatus.SOURCE_NORMAL.getCode(), operator);
+ break;
+ case DELETE:
+ sourceService.logicDeleteAll(groupId, null, operator);
+ break;
+ default:
+ log.warn("Unsupported operate={} for inlong group",
operateType);
+ break;
+ }
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
index 8f1890621..831faabd1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
@@ -25,7 +25,7 @@ import
org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupPro
import
org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 070d4fc76..4afeb82e1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
import
org.apache.inlong.manager.service.workflow.stream.listener.InitStreamCompleteListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index c87cc760d..b5919d907 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
import
org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
index fa0495098..8cd67a943 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
import
org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
index 9eb8591c8..4a51b57ec 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import
org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
import
org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
index 10fcc65bd..225671651 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
@@ -65,7 +65,7 @@ public class UpdateStreamCompleteListener implements
ProcessEventListener {
status = StreamStatus.DELETED;
break;
default:
- throw new RuntimeException(String.format("Unsupported
operation=%s for inlong group", operateType));
+ throw new RuntimeException(String.format("Unsupported
operate=%s for inlong group", operateType));
}
streamService.updateStatus(groupId, streamId, status.getCode(),
operator);
streamService.update(streamInfo.genRequest(), operator);
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
index e0ed84739..8b0621e22 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
@@ -36,7 +36,7 @@ import
org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.mocks.MockPlugin;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowServiceImplTest;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
index e922b9218..af8f75965 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
@@ -31,7 +31,7 @@ import
org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import
org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.core.ProcessService;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 0257456f8..09c73638d 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.workflow;
import com.google.common.collect.Lists;
import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
diff --git
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
index 9f1f739e9..f57ae3541 100644
---
a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
+++
b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/processor/UserTaskProcessor.java
@@ -78,7 +78,7 @@ public class UserTaskProcessor extends
AbstractTaskProcessor<UserTask> {
@Override
public boolean create(UserTask userTask, WorkflowContext context) {
List<String> approvers = userTask.getApproverAssign().assign(context);
- Preconditions.checkNotEmpty(approvers, "cannot assign approvers for
task: " + userTask.getDisplayName()
+ Preconditions.checkNotEmpty(approvers, "Cannot assign approvers for
task: " + userTask.getDisplayName()
+ ", as the approvers was empty");
if (!userTask.isNeedAllApprove()) {