This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 95352390c1 [INLONG-9995][Manager] Support batch saving of group
information and other operations (#9996)
95352390c1 is described below
commit 95352390c1ea3defebd20adbf03d74dcbc6f55ea
Author: fuweng11 <[email protected]>
AuthorDate: Thu Apr 18 19:42:33 2024 +0800
[INLONG-9995][Manager] Support batch saving of group information and other
operations (#9996)
---
.../manager/client/api/impl/InlongGroupImpl.java | 2 +-
.../client/api/inner/client/InlongGroupClient.java | 25 +++++++++-
.../api/inner/client/InlongStreamClient.java | 11 +++++
.../client/api/inner/client/StreamSinkClient.java | 7 +++
.../api/inner/client/StreamSourceClient.java | 11 +++++
.../manager/client/api/service/InlongGroupApi.java | 9 +++-
.../client/api/service/InlongStreamApi.java | 4 ++
.../manager/client/api/service/StreamSinkApi.java | 4 ++
.../client/api/service/StreamSourceApi.java | 6 +++
.../inlong/manager/common/enums/OperationType.java | 15 +++++-
.../inlong/manager/pojo/common/BatchResult.java} | 32 ++++++-------
.../manager/pojo/workflow/ProcessResponse.java | 2 +-
.../inlong/manager/pojo/workflow/TaskResponse.java | 2 +-
.../form/process/ApplyConsumeProcessForm.java | 8 +++-
.../form/process/ApplyGroupProcessForm.java | 53 ++++++++++++++++++++--
.../form/process/GroupResourceProcessForm.java | 7 ++-
.../pojo/workflow/form/process/ProcessForm.java | 3 +-
.../workflow/form/task/InlongGroupApproveForm.java | 38 +++++++++++++++-
.../service/datatype/CsvDataTypeOperator.java | 7 ++-
.../service/group/InlongGroupProcessService.java | 35 ++++++++++++++
.../manager/service/group/InlongGroupService.java | 11 +++++
.../service/group/InlongGroupServiceImpl.java | 24 ++++++++++
.../group/apply/AfterApprovedTaskListener.java | 20 +++++---
.../group/apply/ApproveApplyProcessListener.java | 29 ++++++------
.../group/apply/CancelApplyProcessListener.java | 33 ++++++++------
.../group/apply/RejectApplyProcessListener.java | 31 +++++++------
.../manager/service/sink/StreamSinkService.java | 10 ++++
.../service/sink/StreamSinkServiceImpl.java | 25 ++++++++++
.../service/source/StreamSourceService.java | 10 ++++
.../service/source/StreamSourceServiceImpl.java | 25 ++++++++++
.../service/stream/InlongStreamService.java | 10 ++++
.../service/stream/InlongStreamServiceImpl.java | 24 ++++++++++
.../service/workflow/WorkflowServiceImpl.java | 4 +-
.../web/controller/InlongGroupController.java | 21 +++++++++
.../web/controller/InlongStreamController.java | 10 ++++
.../web/controller/StreamSinkController.java | 8 ++++
.../web/controller/StreamSourceController.java | 11 +++++
.../openapi/OpenInLongGroupController.java | 10 ++++
.../openapi/OpenInLongStreamController.java | 9 ++++
.../openapi/OpenStreamSinkController.java | 8 ++++
.../openapi/OpenStreamSourceController.java | 11 +++++
41 files changed, 537 insertions(+), 88 deletions(-)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 9316760874..7a991803df 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -117,7 +117,7 @@ public class InlongGroupImpl implements InlongGroup {
@Override
public InlongGroupContext init() throws Exception {
InlongGroupInfo groupInfo = this.groupContext.getGroupInfo();
- WorkflowResult initWorkflowResult =
groupClient.initInlongGroup(groupInfo.genRequest());
+ WorkflowResult initWorkflowResult =
groupClient.startProcess(groupInfo.genRequest());
List<TaskResponse> taskViews = initWorkflowResult.getNewTasks();
Preconditions.expectNotEmpty(taskViews, "init inlong group info
failed");
TaskResponse taskView = taskViews.get(0);
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index c7462fd0b2..54b61f91c2 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
@@ -45,6 +46,7 @@ import
org.springframework.boot.configurationprocessor.json.JSONObject;
import retrofit2.Call;
import java.util.List;
+import java.util.stream.Collectors;
import static
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
import static
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
@@ -167,6 +169,16 @@ public class InlongGroupClient {
return response.getData();
}
+ /**
+ * Batch create inlong group
+ */
+ public List<BatchResult> batchCreateGroup(List<InlongGroupRequest>
groupRequestList) {
+ Response<List<BatchResult>> response =
+
ClientUtils.executeHttpCall(inlongGroupApi.batchCreateGroup(groupRequestList));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Update inlong group info
*
@@ -186,9 +198,18 @@ public class InlongGroupClient {
return response.getData();
}
- public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) {
+ public WorkflowResult startProcess(InlongGroupRequest groupInfo) {
+ Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(
+ inlongGroupApi.startProcess(groupInfo.getInlongGroupId()));
+ ClientUtils.assertRespSuccess(responseBody);
+ return responseBody.getData();
+ }
+
+ public WorkflowResult batchStartProcess(List<InlongGroupRequest>
groupRequestList) {
+ List<String> groupIdList =
groupRequestList.stream().map(InlongGroupRequest::getInlongGroupId).collect(
+ Collectors.toList());
Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(
- inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId()));
+ inlongGroupApi.batchStartProcess(groupIdList));
ClientUtils.assertRespSuccess(responseBody);
return responseBody.getData();
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index a71d7ac2b3..89ca7eef68 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.manager.client.api.service.InlongStreamApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
@@ -58,6 +59,16 @@ public class InlongStreamClient {
return response.getData();
}
+ /**
+ * Batch create inlong stream.
+ */
+ public List<BatchResult> batchCreateStreamInfo(List<InlongStreamInfo>
streamInfos) {
+ Response<List<BatchResult>> response =
+
ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Query whether the inlong stream ID exists
*
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index bb493f5966..c350900f06 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.manager.client.api.ClientConfiguration;
import org.apache.inlong.manager.client.api.service.StreamSinkApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
@@ -54,6 +55,12 @@ public class StreamSinkClient {
return response.getData();
}
+ public List<BatchResult> batchCreateSink(List<SinkRequest> sinkRequests) {
+ Response<List<BatchResult>> response =
ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* Delete stream sink info by ID.
*/
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
index 88a1ed7cc3..baa6a601ba 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.manager.client.api.service.StreamSourceApi;
import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
@@ -52,6 +53,16 @@ public class StreamSourceClient {
return response.getData();
}
+ /**
+ * Batch create inlong stream source.
+ */
+ public List<BatchResult> batchCreateSource(List<SourceRequest>
requestList) {
+ Response<List<BatchResult>> response =
+
ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList));
+ ClientUtils.assertRespSuccess(response);
+ return response.getData();
+ }
+
/**
* List stream sources by the given groupId and streamId.
*/
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
index 9f9df342b2..819d9802d9 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.client.api.service;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
@@ -50,11 +51,17 @@ public interface InlongGroupApi {
@POST("group/save")
Call<Response<String>> createGroup(@Body InlongGroupRequest request);
+ @POST("group/batchSave")
+ Call<Response<List<BatchResult>>> batchCreateGroup(@Body
List<InlongGroupRequest> requestList);
+
@POST("group/update")
Call<Response<String>> updateGroup(@Body InlongGroupRequest request);
@POST("group/startProcess/{id}")
- Call<Response<WorkflowResult>> initInlongGroup(@Path("id") String id);
+ Call<Response<WorkflowResult>> startProcess(@Path("id") String id);
+
+ @POST("group/batchStartProcess/{id}")
+ Call<Response<WorkflowResult>> batchStartProcess(@Body List<String>
groupIdList);
@POST("group/suspendProcessAsync/{id}")
Call<Response<String>> suspendProcessAsync(@Path("id") String id);
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 273c50fee9..6b47d8aeb2 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.client.api.service;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
@@ -41,6 +42,9 @@ public interface InlongStreamApi {
@POST("stream/save")
Call<Response<Integer>> createStream(@Body InlongStreamInfo stream);
+ @POST("stream/batchSave")
+ Call<Response<List<BatchResult>>> batchCreateStream(@Body
List<InlongStreamInfo> streamInfos);
+
@GET("stream/exist/{groupId}/{streamId}")
Call<Response<Boolean>> isStreamExists(@Path("groupId") String groupId,
@Path("streamId") String streamId);
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 2443db5aab..75088a5eee 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.client.api.service;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
@@ -41,6 +42,9 @@ public interface StreamSinkApi {
@POST("sink/save")
Call<Response<Integer>> save(@Body SinkRequest request);
+ @POST("sink/batchSave")
+ Call<Response<List<BatchResult>>> batchSave(@Body List<SinkRequest>
requestList);
+
@POST("sink/update")
Call<Response<Boolean>> updateById(@Body SinkRequest request);
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
index d383e30d86..31fb815aff 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.client.api.service;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
@@ -31,11 +32,16 @@ import retrofit2.http.POST;
import retrofit2.http.Path;
import retrofit2.http.Query;
+import java.util.List;
+
public interface StreamSourceApi {
@POST("source/save")
Call<Response<Integer>> createSource(@Body SourceRequest request);
+ @POST("source/batchSave")
+ Call<Response<List<BatchResult>>> batchCreateSource(@Body
List<SourceRequest> requestList);
+
@POST("source/update")
Call<Response<Boolean>> updateSource(@Body SourceRequest request);
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
index 0215a694e9..ab39ad311d 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
@@ -35,5 +35,18 @@ public enum OperationType {
GET,
- LIST
+ LIST,
+
+ SUSPEND,
+
+ START;
+
+ public static OperationType forOperationType(String type) {
+ for (OperationType operationType : values()) {
+ if (operationType.name().equalsIgnoreCase(type)) {
+ return operationType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unsupported
operation type for %s", type));
+ }
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java
similarity index 62%
copy from
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
copy to
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java
index 0215a694e9..140dfb09bc 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java
@@ -15,25 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.enums;
+package org.apache.inlong.manager.pojo.common;
-/**
- * Operation type
- */
-public enum OperationType {
-
- CREATE,
-
- UPDATE,
+import org.apache.inlong.manager.common.enums.OperationTarget;
- /**
- * Insert or update
- */
- UPSERT,
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- DELETE,
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class BatchResult {
- GET,
+ // Unique identification of operation target
+ private String uniqueKey;
+ private boolean success;
+ private OperationTarget operationTarget;
+ private String errMsg;
- LIST
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java
index 215fbb3051..059e8edb82 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java
@@ -77,6 +77,6 @@ public class ProcessResponse {
private List<TaskResponse> currentTasks;
@ApiModelProperty(value = "Extra information shown in the list")
- private Map<String, Object> showInList;
+ private List<Map<String, Object>> showInList;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java
index 519d6a7c16..06cae85145 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java
@@ -89,6 +89,6 @@ public class TaskResponse {
private Object extParams;
@ApiModelProperty(value = "Extra information shown in the list")
- private Map<String, Object> showInList;
+ private List<Map<String, Object>> showInList;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
index 4b6792604c..10db1197fc 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
@@ -26,6 +26,8 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/**
@@ -56,12 +58,14 @@ public class ApplyConsumeProcessForm extends
BaseProcessForm {
}
@Override
- public Map<String, Object> showInList() {
+ public List<Map<String, Object>> showInList() {
+ List<Map<String, Object>> showInList = new ArrayList<>();
Map<String, Object> show = Maps.newHashMap();
if (consumeInfo != null) {
show.put("inlongGroupId", consumeInfo.getInlongGroupId());
show.put("consumerGroup", consumeInfo.getConsumerGroup());
}
- return show;
+ showInList.add(show);
+ return showInList;
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java
index c3b7005dd9..272052f514 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java
@@ -22,13 +22,20 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
+import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Apply inlong group process form
@@ -39,15 +46,19 @@ public class ApplyGroupProcessForm extends BaseProcessForm {
public static final String FORM_NAME = "ApplyGroupProcessForm";
- @ApiModelProperty(value = "Inlong group info", required = true)
+ @ApiModelProperty(value = "Inlong group info")
private InlongGroupInfo groupInfo;
@ApiModelProperty(value = "All inlong stream info under the inlong group,
including the sink info")
private List<InlongStreamBriefInfo> streamInfoList;
+ @ApiModelProperty(value = "Inlong group full info list")
+ private List<GroupFullInfo> groupFullInfoList;
+
@Override
public void validate() throws FormValidateException {
- Preconditions.expectNotNull(groupInfo, "inlong group info is empty");
+ Preconditions.expectTrue(groupInfo != null ||
CollectionUtils.isNotEmpty(groupFullInfoList),
+ "inlong group info is empty");
}
@Override
@@ -57,14 +68,46 @@ public class ApplyGroupProcessForm extends BaseProcessForm {
@Override
public String getInlongGroupId() {
- return groupInfo.getInlongGroupId();
+ if (groupInfo != null) {
+ return groupInfo.getInlongGroupId();
+ }
+ List<String> groupIdList = groupFullInfoList.stream().map(v -> {
+ InlongGroupInfo groupInfo = v.getGroupInfo();
+ return groupInfo.getInlongGroupId();
+ }).collect(Collectors.toList());
+ return Joiner.on(",").join(groupIdList);
}
@Override
- public Map<String, Object> showInList() {
+ public List<Map<String, Object>> showInList() {
+ List<Map<String, Object>> showList = new ArrayList<>();
+ if (groupInfo != null) {
+ addShowInfo(groupInfo, showList);
+ }
+ if (CollectionUtils.isNotEmpty(groupFullInfoList)) {
+ groupFullInfoList.forEach(groupFullInfo -> {
+ addShowInfo(groupFullInfo.getGroupInfo(), showList);
+ });
+ }
+ return showList;
+ }
+
+ private void addShowInfo(InlongGroupInfo groupInfo, List<Map<String,
Object>> showList) {
Map<String, Object> show = Maps.newHashMap();
show.put("inlongGroupId", groupInfo.getInlongGroupId());
show.put("inlongGroupMode", groupInfo.getInlongGroupMode());
- return show;
+ showList.add(show);
+ }
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class GroupFullInfo {
+
+ private InlongGroupInfo groupInfo;
+
+ private List<InlongStreamBriefInfo> streamInfoList;
+
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java
index d955a2c5a6..1dd1294653 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -61,11 +62,13 @@ public class GroupResourceProcessForm extends
BaseProcessForm {
}
@Override
- public Map<String, Object> showInList() {
+ public List<Map<String, Object>> showInList() {
+ List<Map<String, Object>> showInList = new ArrayList<>();
Map<String, Object> show = new HashMap<>();
show.put("inlongGroupId", groupInfo.getInlongGroupId());
show.put("groupOperateType", this.groupOperateType);
- return show;
+ showInList.add(show);
+ return showInList;
}
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java
index 95f4d1451c..bcf1278e97 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.pojo.workflow.form.Form;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.List;
import java.util.Map;
/**
@@ -47,7 +48,7 @@ public interface ProcessForm extends Form {
/**
* Field data displayed in the process list.
*/
- default Map<String, Object> showInList() {
+ default List<Map<String, Object>> showInList() {
return null;
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java
index bcbcc571c0..e7fbbc37ff 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java
@@ -22,10 +22,16 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -37,15 +43,19 @@ public class InlongGroupApproveForm extends BaseTaskForm {
public static final String FORM_NAME = "InlongGroupApproveForm";
- @ApiModelProperty(value = "Inlong group approve info", required = true)
+ @ApiModelProperty(value = "Inlong group approve info")
private InlongGroupApproveRequest groupApproveInfo;
@ApiModelProperty(value = "All inlong stream info under the inlong group,
including the sink info")
private List<InlongStreamApproveRequest> streamApproveInfoList;
+ @ApiModelProperty(value = "Inlong group approve full info list")
+ private List<GroupApproveFullRequest> groupApproveFullInfoList;
+
@Override
public void validate() throws FormValidateException {
- Preconditions.expectNotNull(groupApproveInfo, "inlong group approve
info is empty");
+ Preconditions.expectTrue(groupApproveInfo != null ||
CollectionUtils.isNotEmpty(groupApproveFullInfoList),
+ "inlong group approve info is empty");
}
@Override
@@ -53,4 +63,28 @@ public class InlongGroupApproveForm extends BaseTaskForm {
return FORM_NAME;
}
+ @JsonIgnore
+ public List<GroupApproveFullRequest> getApproveFullRequest() {
+ List<GroupApproveFullRequest> result = new ArrayList<>();
+ if (groupApproveInfo != null) {
+ result.add(new GroupApproveFullRequest(groupApproveInfo,
streamApproveInfoList));
+ }
+ if (CollectionUtils.isNotEmpty(groupApproveFullInfoList)) {
+ result.addAll(groupApproveFullInfoList);
+ }
+ return result;
+ }
+
+ @Data
+ @Builder
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class GroupApproveFullRequest {
+
+ private InlongGroupApproveRequest groupApproveInfo;
+
+ private List<InlongStreamApproveRequest> streamApproveInfoList;
+
+ }
+
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
index cb1cededdb..0a1118706c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
@@ -46,11 +46,10 @@ public class CsvDataTypeOperator implements
DataTypeOperator {
separator = (char)
Integer.parseInt(streamInfo.getDataSeparator());
}
String[] bodys = StringUtils.split(str, separator);
- if (bodys.length != fields.size()) {
- return fields;
- }
for (int i = 0; i < bodys.length; i++) {
- fields.get(i).setFieldValue(bodys[i]);
+ if (i < fields.size()) {
+ fields.get(i).setFieldValue(bodys[i]);
+ }
}
} catch (Exception e) {
log.warn("parse fields failed for groupId = {}, streamId = {}",
streamInfo.getInlongGroupId(),
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index dbb78584b7..8052315d1a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -41,6 +41,7 @@ import org.apache.inlong.manager.pojo.workflow.ProcessRequest;
import org.apache.inlong.manager.pojo.workflow.TaskResponse;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
import
org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm;
+import
org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm.GroupFullInfo;
import
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.service.workflow.WorkflowService;
@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
@@ -116,6 +118,24 @@ public class InlongGroupProcessService {
return result;
}
+ public WorkflowResult batchStartProcess(List<String> groupIdList, String
operator) {
+ for (String groupId : groupIdList) {
+ LOGGER.info("begin to start approve process for groupId={} by
operator={}", groupId, operator);
+
+ groupService.updateStatus(groupId,
GroupStatus.TO_BE_APPROVAL.getCode(), operator);
+ }
+ ApplyGroupProcessForm form = genApplyGroupProcessForm(groupIdList);
+ WorkflowResult result =
workflowService.start(ProcessName.APPLY_GROUP_PROCESS, operator, form);
+ List<TaskResponse> tasks = result.getNewTasks();
+ if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+ throw new
BusinessException(ErrorCodeEnum.WORKFLOW_START_RECORD_FAILED,
+ String.format("failed to start inlong group for
groupId=%s", groupIdList));
+ }
+
+ LOGGER.info("success to start approve process for groupId={} by
operator={}", groupIdList, operator);
+ return result;
+ }
+
/**
* Suspend InlongGroup in an asynchronous way.
*
@@ -368,6 +388,21 @@ public class InlongGroupProcessService {
return form;
}
+ private ApplyGroupProcessForm genApplyGroupProcessForm(List<String>
groupIdList) {
+ ApplyGroupProcessForm form = new ApplyGroupProcessForm();
+ List<GroupFullInfo> groupFullInfoList = new ArrayList<>();
+ for (String groupId : groupIdList) {
+ InlongGroupInfo groupInfo = groupService.get(groupId);
+ List<InlongStreamBriefInfo> infoList =
streamService.listBriefWithSink(groupInfo.getInlongGroupId());
+ GroupFullInfo groupFullInfo = new GroupFullInfo();
+ groupFullInfo.setGroupInfo(groupInfo);
+ groupFullInfo.setStreamInfoList(infoList);
+ groupFullInfoList.add(groupFullInfo);
+ }
+ form.setGroupFullInfoList(groupFullInfoList);
+ return form;
+ }
+
/**
* Generate the form of [Group Resource Workflow]
*/
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index d2f8b09048..959f46edf1 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.group;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
@@ -60,6 +61,16 @@ public interface InlongGroupService {
String save(@Valid @NotNull(message = "inlong group request cannot be
null") InlongGroupRequest groupInfo,
UserInfo opInfo);
+ /**
+ * Batch save inlong group info.
+ *
+ * @param groupRequestList group request list need to save
+ * @param operator name of operator
+ * @return inlong group id list after saving
+ */
+ List<BatchResult> batchSave(
+ @Valid @NotNull(message = "inlong group request list cannot be
null") List<InlongGroupRequest> groupRequestList,
+ String operator);
/**
* Query whether the specified group id exists
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 2d87b74995..9c4f85baac 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -23,6 +23,7 @@ import
org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -42,6 +43,7 @@ import
org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -216,6 +218,28 @@ public class InlongGroupServiceImpl implements
InlongGroupService {
return groupId;
}
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public List<BatchResult> batchSave(List<InlongGroupRequest>
groupRequestList, String operator) {
+ List<BatchResult> resultList = new ArrayList<>();
+ for (InlongGroupRequest groupRequest : groupRequestList) {
+ BatchResult result = BatchResult.builder()
+ .uniqueKey(groupRequest.getInlongGroupId())
+ .operationTarget(OperationTarget.GROUP)
+ .build();
+ try {
+ this.save(groupRequest, operator);
+ result.setSuccess(true);
+ } catch (Exception e) {
+ LOGGER.error("failed to save inlong group for groupId={}",
groupRequest.getInlongGroupId(), e);
+ result.setSuccess(false);
+ result.setErrMsg(e.getMessage());
+ }
+ resultList.add(result);
+ }
+ return resultList;
+ }
+
@Override
public Boolean exist(String groupId) {
Preconditions.expectNotNull(groupId,
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java
index 819f455e59..63e671f33c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
import
org.apache.inlong.manager.pojo.workflow.form.task.InlongGroupApproveForm;
+import
org.apache.inlong.manager.pojo.workflow.form.task.InlongGroupApproveForm.GroupApproveFullRequest;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -32,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.List;
+
/**
* The listener for modifying InlongGroup info after the application
InlongGroup process is approved.
*/
@@ -57,15 +60,18 @@ public class AfterApprovedTaskListener implements
TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws
WorkflowListenerException {
InlongGroupApproveForm form = (InlongGroupApproveForm)
context.getActionContext().getForm();
- InlongGroupApproveRequest approveInfo = form.getGroupApproveInfo();
- String groupId = approveInfo.getInlongGroupId();
- log.info("begin to execute AfterApprovedTaskListener for groupId={}",
groupId);
+ List<GroupApproveFullRequest> approveRequestList =
form.getApproveFullRequest();
+ for (GroupApproveFullRequest request : approveRequestList) {
+ InlongGroupApproveRequest approveInfo =
request.getGroupApproveInfo();
+ String groupId = approveInfo.getInlongGroupId();
+ log.info("begin to execute AfterApprovedTaskListener for
groupId={}", groupId);
- // save the inlong group and other info after approval
- groupService.updateAfterApprove(approveInfo, context.getOperator());
- streamService.updateAfterApprove(form.getStreamApproveInfoList(),
context.getOperator());
+ // save the inlong group and other info after approval
+ groupService.updateAfterApprove(approveInfo,
context.getOperator());
+
streamService.updateAfterApprove(request.getStreamApproveInfoList(),
context.getOperator());
- log.info("success to execute AfterApprovedTaskListener for
groupId={}", groupId);
+ log.info("success to execute AfterApprovedTaskListener for
groupId={}", groupId);
+ }
return ListenerResult.success();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
index 4d285488f6..f74153f6a8 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.listener.group.apply;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -37,6 +38,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Arrays;
import java.util.List;
/**
@@ -63,21 +65,22 @@ public class ApproveApplyProcessListener implements
ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws
WorkflowListenerException {
ApplyGroupProcessForm form = (ApplyGroupProcessForm)
context.getProcessForm();
- String groupId = form.getInlongGroupId();
- log.info("begin to execute ApproveApplyProcessListener for
groupId={}", groupId);
+ List<String> groupList =
Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA));
+ for (String groupId : groupList) {
+ log.info("begin to execute ApproveApplyProcessListener for
groupId={}", groupId);
- InlongGroupInfo groupInfo = groupService.get(groupId);
- GroupResourceProcessForm processForm = new GroupResourceProcessForm();
- processForm.setGroupInfo(groupInfo);
- String username = context.getOperator();
- List<InlongStreamInfo> streamList = streamService.list(groupId);
- processForm.setStreamInfos(streamList);
+ InlongGroupInfo groupInfo = groupService.get(groupId);
+ GroupResourceProcessForm processForm = new
GroupResourceProcessForm();
+ processForm.setGroupInfo(groupInfo);
+ List<InlongStreamInfo> streamList = streamService.list(groupId);
+ processForm.setStreamInfos(streamList);
- // may run for long time, make it async processing
- UserInfo userInfo = LoginUserUtils.getLoginUser();
- EXECUTOR_SERVICE.execute(
- () ->
workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo,
processForm));
- log.info("success to execute ApproveApplyProcessListener for
groupId={}", groupId);
+ // may run for long time, make it async processing
+ UserInfo userInfo = LoginUserUtils.getLoginUser();
+ EXECUTOR_SERVICE.execute(
+ () ->
workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo,
processForm));
+ log.info("success to execute ApproveApplyProcessListener for
groupId={}", groupId);
+ }
return ListenerResult.success();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java
index c6b2871ba8..5f03ebbf0f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.listener.group.apply;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -31,6 +32,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
/**
@@ -54,22 +57,24 @@ public class CancelApplyProcessListener implements
ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws
WorkflowListenerException {
ApplyGroupProcessForm form = (ApplyGroupProcessForm)
context.getProcessForm();
- String groupId = form.getInlongGroupId();
- log.info("begin to execute CancelApplyProcessListener for groupId={}",
groupId);
+ List<String> groupList =
Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA));
+ for (String groupId : groupList) {
+ log.info("begin to execute CancelApplyProcessListener for
groupId={}", groupId);
- // only the [ToBeApproval] status allowed the canceling operation
- InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
- if (entity == null) {
- throw new WorkflowListenerException("InlongGroup not found with
groupId=" + groupId);
- }
- if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(),
entity.getStatus())) {
- throw new WorkflowListenerException(String.format("Current status
[%s] was not allowed to cancel",
- GroupStatus.forCode(entity.getStatus())));
- }
- String operator = context.getOperator();
- groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(),
operator);
+ // only the [ToBeApproval] status allowed the canceling operation
+ InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+ if (entity == null) {
+ throw new WorkflowListenerException("InlongGroup not found
with groupId=" + groupId);
+ }
+ if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(),
entity.getStatus())) {
+ throw new WorkflowListenerException(String.format("Current
status [%s] was not allowed to cancel",
+ GroupStatus.forCode(entity.getStatus())));
+ }
+ String operator = context.getOperator();
+ groupMapper.updateStatus(groupId,
GroupStatus.TO_BE_SUBMIT.getCode(), operator);
- log.info("success to execute CancelApplyProcessListener for
groupId={}", groupId);
+ log.info("success to execute CancelApplyProcessListener for
groupId={}", groupId);
+ }
return ListenerResult.success();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java
index 4b387e6da9..8ad8bd766a 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.listener.group.apply;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -32,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
/**
@@ -54,21 +57,23 @@ public class RejectApplyProcessListener implements
ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws
WorkflowListenerException {
ApplyGroupProcessForm form = (ApplyGroupProcessForm)
context.getProcessForm();
- String groupId = form.getInlongGroupId();
- log.info("begin to execute RejectApplyProcessListener for groupId={}",
groupId);
+ List<String> groupList =
Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA));
+ for (String groupId : groupList) {
+ log.info("begin to execute RejectApplyProcessListener for
groupId={}", groupId);
- // only the [TO_BE_APPROVAL] status allowed the rejecting operation
- InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
- if (entity == null) {
- throw new WorkflowListenerException("inlong group not found with
groupId=" + groupId);
- }
- if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(),
entity.getStatus())) {
- throw new WorkflowListenerException("current status was not
allowed to reject inlong group");
- }
+ // only the [TO_BE_APPROVAL] status allowed the rejecting operation
+ InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+ if (entity == null) {
+ throw new WorkflowListenerException("inlong group not found
with groupId=" + groupId);
+ }
+ if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(),
entity.getStatus())) {
+ throw new WorkflowListenerException("current status was not
allowed to reject inlong group");
+ }
- // after reject, update InlongGroup status to [APPROVE_REJECTED]
- String username = context.getOperator();
- groupService.updateStatus(groupId,
GroupStatus.APPROVE_REJECTED.getCode(), username);
+ // after reject, update InlongGroup status to [APPROVE_REJECTED]
+ String username = context.getOperator();
+ groupService.updateStatus(groupId,
GroupStatus.APPROVE_REJECTED.getCode(), username);
+ }
return ListenerResult.success();
}
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 06f97bb762..79da89d512 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -59,6 +60,15 @@ public interface StreamSinkService {
*/
Integer save(SinkRequest request, UserInfo opInfo);
+ /**
+ * Batch save the sink info.
+ *
+ * @param requestList sink request list need to save
+ * @param operator name of operator
+ * @return sink id list after saving
+ */
+ List<BatchResult> batchSave(List<SinkRequest> requestList, String
operator);
+
/**
* Get stream sink info based on id.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 31092bdb0b..4d7fd556ac 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sink;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
@@ -34,6 +35,7 @@ import
org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -234,6 +236,29 @@ public class StreamSinkServiceImpl implements
StreamSinkService {
return id;
}
+ @Override
+ public List<BatchResult> batchSave(List<SinkRequest> requestList, String
operator) {
+ List<BatchResult> resultList = new ArrayList<>();
+ for (SinkRequest request : requestList) {
+ BatchResult result = BatchResult.builder()
+ .uniqueKey(request.getInlongGroupId() + "-" +
request.getInlongStreamId() + "-"
+ + request.getSinkName())
+ .operationTarget(OperationTarget.SINK)
+ .build();
+ try {
+ this.save(request, operator);
+ result.setSuccess(true);
+ } catch (Exception e) {
+ LOGGER.error("failed to save save source info for sinkName={},
groupId={}, streamId={}",
+ request.getSinkName(), request.getInlongGroupId(),
request.getInlongStreamId(), e);
+ result.setSuccess(false);
+ result.setErrMsg(e.getMessage());
+ }
+ resultList.add(result);
+ }
+ return resultList;
+ }
+
@Override
public StreamSink get(Integer id) {
if (id == null) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 0dd4decd28..879ca5e012 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.source;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
@@ -52,6 +53,15 @@ public interface StreamSourceService {
*/
Integer save(SourceRequest request, UserInfo opInfo);
+ /**
+ * Batch save the source information
+ *
+ * @param requestList Source request list.
+ * @param operator Operator's name.
+ * @return source id list after saving.
+ */
+ List<BatchResult> batchSave(List<SourceRequest> requestList, String
operator);
+
/**
* Query source information based on id
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 2d3855b05e..c252a6a00c 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -33,6 +34,7 @@ import
org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -162,6 +164,29 @@ public class StreamSourceServiceImpl implements
StreamSourceService {
return sourceOperator.saveOpt(request, groupEntity.getStatus(),
opInfo.getName());
}
+ @Override
+ public List<BatchResult> batchSave(List<SourceRequest> requestList, String
operator) {
+ List<BatchResult> resultList = new ArrayList<>();
+ for (SourceRequest request : requestList) {
+ BatchResult result = BatchResult.builder()
+ .uniqueKey(request.getInlongGroupId() + "-" +
request.getInlongStreamId() + "-"
+ + request.getSourceName())
+ .operationTarget(OperationTarget.SOURCE)
+ .build();
+ try {
+ this.save(request, operator);
+ result.setSuccess(true);
+ } catch (Exception e) {
+ LOGGER.error("failed to save save source info for
sourceName={}, groupId={}, streamId={}",
+ request.getSourceName(), request.getInlongGroupId(),
request.getInlongStreamId(), e);
+ result.setSuccess(false);
+ result.setErrMsg(e.getMessage());
+ }
+ resultList.add(result);
+ }
+ return resultList;
+ }
+
@Override
public StreamSource get(Integer id) {
if (id == null) {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 328e9ae73c..e54ff475b5 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -17,6 +17,7 @@
package org.apache.inlong.manager.service.stream;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
@@ -50,6 +51,15 @@ public interface InlongStreamService {
*/
Integer save(InlongStreamRequest request, String operator);
+ /**
+ * Batch save inlong stream information.
+ *
+ * @param requestList Inlong stream information list.
+ * @param operator The name of operator.
+ * @return Id list after successful save.
+ */
+ List<BatchResult> batchSave(List<InlongStreamRequest> requestList, String
operator);
+
/**
* Save inlong stream information.
*
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index b9d8ef8d36..21df28228f 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.stream;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.tool.excel.ExcelTool;
@@ -35,6 +36,7 @@ import
org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
import org.apache.inlong.manager.pojo.common.PageResult;
@@ -184,6 +186,28 @@ public class InlongStreamServiceImpl implements
InlongStreamService {
return streamEntity.getId();
}
+ @Override
+ public List<BatchResult> batchSave(List<InlongStreamRequest> requestList,
String operator) {
+ List<BatchResult> resultList = new ArrayList<>();
+ for (InlongStreamRequest request : requestList) {
+ BatchResult result = BatchResult.builder()
+ .uniqueKey(request.getInlongGroupId() + "-" +
request.getInlongStreamId())
+ .operationTarget(OperationTarget.STREAM)
+ .build();
+ try {
+ this.save(request, operator);
+ result.setSuccess(true);
+ } catch (Exception e) {
+ LOGGER.error("failed to save inlong stream for groupId={},
streamId={}", request.getInlongGroupId(),
+ request.getInlongStreamId(), e);
+ result.setSuccess(false);
+ result.setErrMsg(e.getMessage());
+ }
+ resultList.add(result);
+ }
+ return resultList;
+ }
+
@Override
public Integer save(InlongStreamRequest request, UserInfo opInfo) {
InlongGroupEntity entity =
groupMapper.selectByGroupId(request.getInlongGroupId());
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index 0863b8e666..739b68fcab 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -244,7 +244,7 @@ public class WorkflowServiceImpl implements WorkflowService
{
};
}
- private Map<String, Object> getShowInList(WorkflowProcessEntity
processEntity) {
+ private List<Map<String, Object>> getShowInList(WorkflowProcessEntity
processEntity) {
WorkflowProcess process =
processDefService.getByName(processEntity.getName());
if (process == null || process.getFormClass() == null) {
return null;
@@ -270,7 +270,7 @@ public class WorkflowServiceImpl implements WorkflowService
{
query.setIdList(list);
List<WorkflowProcessEntity> processEntities =
queryService.listProcessEntity(query);
- Map<Integer, Map<String, Object>> processShowInListMap =
Maps.newHashMap();
+ Map<Integer, List<Map<String, Object>>> processShowInListMap =
Maps.newHashMap();
processEntities.forEach(entity ->
processShowInListMap.put(entity.getId(), getShowInList(entity)));
taskList.forEach(task ->
task.setShowInList(processShowInListMap.get(task.getProcessId())));
}
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 0ad34ae506..da69535631 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
@@ -77,6 +78,15 @@ public class InlongGroupController {
return Response.success(groupService.save(groupRequest, operator));
}
+ @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.GROUP)
+ @ApiOperation(value = "Batch Save inlong group")
+ public Response<List<BatchResult>> batchSave(
+ @Validated(SaveValidation.class) @RequestBody
List<InlongGroupRequest> groupRequestList) {
+ String operator = LoginUserUtils.getLoginUser().getName();
+ return Response.success(groupService.batchSave(groupRequestList,
operator));
+ }
+
@RequestMapping(value = "/group/exist/{groupId}", method =
RequestMethod.GET)
@ApiOperation(value = "Is the inlong group id exists")
@ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class, required = true)
@@ -155,14 +165,24 @@ public class InlongGroupController {
@RequestMapping(value = "/group/startProcess/{groupId}", method =
RequestMethod.POST)
@ApiOperation(value = "Start inlong approval process")
+ @OperationLog(operation = OperationType.START, operationTarget =
OperationTarget.GROUP)
@ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class)
public Response<WorkflowResult> startProcess(@PathVariable String groupId)
{
String operator = LoginUserUtils.getLoginUser().getName();
return Response.success(groupProcessOperation.startProcess(groupId,
operator));
}
+ @RequestMapping(value = "/group/batchStartProcess", method =
RequestMethod.POST)
+ @ApiOperation(value = "Batch start inlong approval process")
+ @OperationLog(operation = OperationType.START, operationTarget =
OperationTarget.GROUP)
+ public Response<WorkflowResult> batchStartProcess(@RequestBody
List<String> groupIdList) {
+ String operator = LoginUserUtils.getLoginUser().getName();
+ return
Response.success(groupProcessOperation.batchStartProcess(groupIdList,
operator));
+ }
+
@RequestMapping(value = "/group/suspendProcess/{groupId}", method =
RequestMethod.POST)
@ApiOperation(value = "Suspend inlong group process")
+ @OperationLog(operation = OperationType.SUSPEND, operationTarget =
OperationTarget.GROUP)
@ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class)
public Response<WorkflowResult> suspendProcess(@PathVariable String
groupId) {
String operator = LoginUserUtils.getLoginUser().getName();
@@ -178,6 +198,7 @@ public class InlongGroupController {
}
@RequestMapping(value = "/group/suspendProcessAsync/{groupId}", method =
RequestMethod.POST)
+ @OperationLog(operation = OperationType.SUSPEND, operationTarget =
OperationTarget.GROUP)
@ApiOperation(value = "Suspend inlong group process")
@ApiImplicitParam(name = "groupId", value = "Inlong group id",
dataTypeClass = String.class)
public Response<String> suspendProcessAsync(@PathVariable String groupId) {
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index da747818ce..c8c6c51186 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.tool.excel.ExcelTool;
import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
@@ -86,6 +87,14 @@ public class InlongStreamController {
return Response.success(result);
}
+ @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.STREAM)
+ @ApiOperation(value = "Batch save inlong stream")
+ public Response<List<BatchResult>> batchSave(@RequestBody
List<InlongStreamRequest> requestList) {
+ List<BatchResult> result = streamService.batchSave(requestList,
LoginUserUtils.getLoginUser().getName());
+ return Response.success(result);
+ }
+
@RequestMapping(value = "/stream/exist/{groupId}/{streamId}", method =
RequestMethod.GET)
@ApiOperation(value = "Is the inlong stream exists")
@ApiImplicitParams({
@@ -154,6 +163,7 @@ public class InlongStreamController {
}
@RequestMapping(value = "/stream/suspendProcess/{groupId}/{streamId}",
method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.SUSPEND, operationTarget =
OperationTarget.STREAM)
@ApiOperation(value = "Suspend inlong stream process")
@ApiImplicitParams({
@ApiImplicitParam(name = "groupId", dataTypeClass = String.class,
required = true),
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index e8d0ef93ab..7869758cdb 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.common.UpdateResult;
@@ -66,6 +67,13 @@ public class StreamSinkController {
return Response.success(sinkService.save(request,
LoginUserUtils.getLoginUser().getName()));
}
+ @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.SINK)
+ @ApiOperation(value = "Batch save stream sink")
+ public Response<List<BatchResult>> batchSave(@Validated @RequestBody
List<SinkRequest> requestList) {
+ return Response.success(sinkService.batchSave(requestList,
LoginUserUtils.getLoginUser().getName()));
+ }
+
@RequestMapping(value = "/sink/get/{id}", method = RequestMethod.GET)
@ApiOperation(value = "Get stream sink")
@OperationLog(operation = OperationType.GET, operationTarget =
OperationTarget.SINK)
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index 8e7645f993..e410af5c1e 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
@@ -44,6 +45,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import java.util.List;
+
/**
* Stream source control layer
*/
@@ -62,6 +65,14 @@ public class StreamSourceController {
return Response.success(sourceService.save(request,
LoginUserUtils.getLoginUser().getName()));
}
+ @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.SOURCE)
+ @ApiOperation(value = "Batch save stream source")
+ public Response<List<BatchResult>> batchSave(
+ @Validated(SaveValidation.class) @RequestBody List<SourceRequest>
requestList) {
+ return Response.success(sourceService.batchSave(requestList,
LoginUserUtils.getLoginUser().getName()));
+ }
+
@RequestMapping(value = "/source/get/{id}", method = RequestMethod.GET)
@ApiOperation(value = "Get stream source")
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required =
true)
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
index 8dca1ad748..3ee366910d 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -88,6 +89,15 @@ public class OpenInLongGroupController {
return Response.success(groupService.save(groupRequest,
LoginUserUtils.getLoginUser()));
}
+ @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.GROUP)
+ @ApiOperation(value = "Batch Save inlong group")
+ public Response<List<BatchResult>> batchSave(
+ @Validated(SaveValidation.class) @RequestBody
List<InlongGroupRequest> groupRequestList) {
+ String operator = LoginUserUtils.getLoginUser().getName();
+ return Response.success(groupService.batchSave(groupRequestList,
operator));
+ }
+
@RequestMapping(value = "/group/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE, operationTarget =
OperationTarget.GROUP)
@ApiOperation(value = "Update inlong group")
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
index 893e0e589b..903e730280 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -105,6 +106,14 @@ public class OpenInLongStreamController {
return Response.success(streamService.save(request,
LoginUserUtils.getLoginUser()));
}
+ @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.STREAM)
+ @ApiOperation(value = "Batch save inlong stream")
+ public Response<List<BatchResult>> batchSave(@RequestBody
List<InlongStreamRequest> requestList) {
+ List<BatchResult> result = streamService.batchSave(requestList,
LoginUserUtils.getLoginUser().getName());
+ return Response.success(result);
+ }
+
@RequestMapping(value = "/stream/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE, operationTarget =
OperationTarget.STREAM)
@ApiOperation(value = "Update inlong stream")
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index d69f0895f4..1d570033df 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationTarget;
import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -83,6 +84,13 @@ public class OpenStreamSinkController {
return Response.success(sinkService.save(request,
LoginUserUtils.getLoginUser()));
}
+ @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.SINK)
+ @ApiOperation(value = "Batch save stream sink")
+ public Response<List<BatchResult>> batchSave(@Validated @RequestBody
List<SinkRequest> requestList) {
+ return Response.success(sinkService.batchSave(requestList,
LoginUserUtils.getLoginUser().getName()));
+ }
+
@RequestMapping(value = "/sink/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE, operationTarget =
OperationTarget.SINK)
@ApiOperation(value = "Update stream sink")
diff --git
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
index 6bd7ab19dd..64e98a58bc 100644
---
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
+++
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.common.validation.SaveValidation;
import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.pojo.source.SourcePageRequest;
@@ -43,6 +44,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
+import java.util.List;
+
/**
* Open InLong Stream Source controller
*/
@@ -80,6 +83,14 @@ public class OpenStreamSourceController {
return Response.success(sourceService.save(request,
LoginUserUtils.getLoginUser()));
}
+ @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST)
+ @OperationLog(operation = OperationType.CREATE, operationTarget =
OperationTarget.SOURCE)
+ @ApiOperation(value = "Batch save stream source")
+ public Response<List<BatchResult>> batchSave(
+ @Validated(SaveValidation.class) @RequestBody List<SourceRequest>
requestList) {
+ return Response.success(sourceService.batchSave(requestList,
LoginUserUtils.getLoginUser().getName()));
+ }
+
@RequestMapping(value = "/source/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE, operationTarget =
OperationTarget.SOURCE)
@ApiOperation(value = "Update stream source")