This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 95352390c1 [INLONG-9995][Manager] Support batch saving of group 
information and other operations (#9996)
95352390c1 is described below

commit 95352390c1ea3defebd20adbf03d74dcbc6f55ea
Author: fuweng11 <[email protected]>
AuthorDate: Thu Apr 18 19:42:33 2024 +0800

    [INLONG-9995][Manager] Support batch saving of group information and other 
operations (#9996)
---
 .../manager/client/api/impl/InlongGroupImpl.java   |  2 +-
 .../client/api/inner/client/InlongGroupClient.java | 25 +++++++++-
 .../api/inner/client/InlongStreamClient.java       | 11 +++++
 .../client/api/inner/client/StreamSinkClient.java  |  7 +++
 .../api/inner/client/StreamSourceClient.java       | 11 +++++
 .../manager/client/api/service/InlongGroupApi.java |  9 +++-
 .../client/api/service/InlongStreamApi.java        |  4 ++
 .../manager/client/api/service/StreamSinkApi.java  |  4 ++
 .../client/api/service/StreamSourceApi.java        |  6 +++
 .../inlong/manager/common/enums/OperationType.java | 15 +++++-
 .../inlong/manager/pojo/common/BatchResult.java}   | 32 ++++++-------
 .../manager/pojo/workflow/ProcessResponse.java     |  2 +-
 .../inlong/manager/pojo/workflow/TaskResponse.java |  2 +-
 .../form/process/ApplyConsumeProcessForm.java      |  8 +++-
 .../form/process/ApplyGroupProcessForm.java        | 53 ++++++++++++++++++++--
 .../form/process/GroupResourceProcessForm.java     |  7 ++-
 .../pojo/workflow/form/process/ProcessForm.java    |  3 +-
 .../workflow/form/task/InlongGroupApproveForm.java | 38 +++++++++++++++-
 .../service/datatype/CsvDataTypeOperator.java      |  7 ++-
 .../service/group/InlongGroupProcessService.java   | 35 ++++++++++++++
 .../manager/service/group/InlongGroupService.java  | 11 +++++
 .../service/group/InlongGroupServiceImpl.java      | 24 ++++++++++
 .../group/apply/AfterApprovedTaskListener.java     | 20 +++++---
 .../group/apply/ApproveApplyProcessListener.java   | 29 ++++++------
 .../group/apply/CancelApplyProcessListener.java    | 33 ++++++++------
 .../group/apply/RejectApplyProcessListener.java    | 31 +++++++------
 .../manager/service/sink/StreamSinkService.java    | 10 ++++
 .../service/sink/StreamSinkServiceImpl.java        | 25 ++++++++++
 .../service/source/StreamSourceService.java        | 10 ++++
 .../service/source/StreamSourceServiceImpl.java    | 25 ++++++++++
 .../service/stream/InlongStreamService.java        | 10 ++++
 .../service/stream/InlongStreamServiceImpl.java    | 24 ++++++++++
 .../service/workflow/WorkflowServiceImpl.java      |  4 +-
 .../web/controller/InlongGroupController.java      | 21 +++++++++
 .../web/controller/InlongStreamController.java     | 10 ++++
 .../web/controller/StreamSinkController.java       |  8 ++++
 .../web/controller/StreamSourceController.java     | 11 +++++
 .../openapi/OpenInLongGroupController.java         | 10 ++++
 .../openapi/OpenInLongStreamController.java        |  9 ++++
 .../openapi/OpenStreamSinkController.java          |  8 ++++
 .../openapi/OpenStreamSourceController.java        | 11 +++++
 41 files changed, 537 insertions(+), 88 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 9316760874..7a991803df 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -117,7 +117,7 @@ public class InlongGroupImpl implements InlongGroup {
     @Override
     public InlongGroupContext init() throws Exception {
         InlongGroupInfo groupInfo = this.groupContext.getGroupInfo();
-        WorkflowResult initWorkflowResult = 
groupClient.initInlongGroup(groupInfo.genRequest());
+        WorkflowResult initWorkflowResult = 
groupClient.startProcess(groupInfo.genRequest());
         List<TaskResponse> taskViews = initWorkflowResult.getNewTasks();
         Preconditions.expectNotEmpty(taskViews, "init inlong group info 
failed");
         TaskResponse taskView = taskViews.get(0);
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
index c7462fd0b2..54b61f91c2 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -25,6 +25,7 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
@@ -45,6 +46,7 @@ import 
org.springframework.boot.configurationprocessor.json.JSONObject;
 import retrofit2.Call;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
 import static 
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
@@ -167,6 +169,16 @@ public class InlongGroupClient {
         return response.getData();
     }
 
+    /**
+     * Batch create inlong group
+     */
+    public List<BatchResult> batchCreateGroup(List<InlongGroupRequest> 
groupRequestList) {
+        Response<List<BatchResult>> response =
+                
ClientUtils.executeHttpCall(inlongGroupApi.batchCreateGroup(groupRequestList));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * Update inlong group info
      *
@@ -186,9 +198,18 @@ public class InlongGroupClient {
         return response.getData();
     }
 
-    public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) {
+    public WorkflowResult startProcess(InlongGroupRequest groupInfo) {
+        Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(
+                inlongGroupApi.startProcess(groupInfo.getInlongGroupId()));
+        ClientUtils.assertRespSuccess(responseBody);
+        return responseBody.getData();
+    }
+
+    public WorkflowResult batchStartProcess(List<InlongGroupRequest> 
groupRequestList) {
+        List<String> groupIdList = 
groupRequestList.stream().map(InlongGroupRequest::getInlongGroupId).collect(
+                Collectors.toList());
         Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(
-                inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId()));
+                inlongGroupApi.batchStartProcess(groupIdList));
         ClientUtils.assertRespSuccess(responseBody);
         return responseBody.getData();
     }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
index a71d7ac2b3..89ca7eef68 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.client.api.service.InlongStreamApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
@@ -58,6 +59,16 @@ public class InlongStreamClient {
         return response.getData();
     }
 
+    /**
+     * Batch create inlong stream.
+     */
+    public List<BatchResult> batchCreateStreamInfo(List<InlongStreamInfo> 
streamInfos) {
+        Response<List<BatchResult>> response =
+                
ClientUtils.executeHttpCall(inlongStreamApi.batchCreateStream(streamInfos));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * Query whether the inlong stream ID exists
      *
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index bb493f5966..c350900f06 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.service.StreamSinkApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
@@ -54,6 +55,12 @@ public class StreamSinkClient {
         return response.getData();
     }
 
+    public List<BatchResult> batchCreateSink(List<SinkRequest> sinkRequests) {
+        Response<List<BatchResult>> response = 
ClientUtils.executeHttpCall(streamSinkApi.batchSave(sinkRequests));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * Delete stream sink info by ID.
      */
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
index 88a1ed7cc3..baa6a601ba 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.client.api.service.StreamSourceApi;
 import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.source.SourcePageRequest;
@@ -52,6 +53,16 @@ public class StreamSourceClient {
         return response.getData();
     }
 
+    /**
+     * Batch create inlong stream source.
+     */
+    public List<BatchResult> batchCreateSource(List<SourceRequest> 
requestList) {
+        Response<List<BatchResult>> response =
+                
ClientUtils.executeHttpCall(streamSourceApi.batchCreateSource(requestList));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
     /**
      * List stream sources by the given groupId and streamId.
      */
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
index 9f9df342b2..819d9802d9 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongGroupApi.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.api.service;
 
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
@@ -50,11 +51,17 @@ public interface InlongGroupApi {
     @POST("group/save")
     Call<Response<String>> createGroup(@Body InlongGroupRequest request);
 
+    @POST("group/batchSave")
+    Call<Response<List<BatchResult>>> batchCreateGroup(@Body 
List<InlongGroupRequest> requestList);
+
     @POST("group/update")
     Call<Response<String>> updateGroup(@Body InlongGroupRequest request);
 
     @POST("group/startProcess/{id}")
-    Call<Response<WorkflowResult>> initInlongGroup(@Path("id") String id);
+    Call<Response<WorkflowResult>> startProcess(@Path("id") String id);
+
+    @POST("group/batchStartProcess/{id}")
+    Call<Response<WorkflowResult>> batchStartProcess(@Body List<String> 
groupIdList);
 
     @POST("group/suspendProcessAsync/{id}")
     Call<Response<String>> suspendProcessAsync(@Path("id") String id);
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
index 273c50fee9..6b47d8aeb2 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongStreamApi.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.api.service;
 
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
@@ -41,6 +42,9 @@ public interface InlongStreamApi {
     @POST("stream/save")
     Call<Response<Integer>> createStream(@Body InlongStreamInfo stream);
 
+    @POST("stream/batchSave")
+    Call<Response<List<BatchResult>>> batchCreateStream(@Body 
List<InlongStreamInfo> streamInfos);
+
     @GET("stream/exist/{groupId}/{streamId}")
     Call<Response<Boolean>> isStreamExists(@Path("groupId") String groupId, 
@Path("streamId") String streamId);
 
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
index 2443db5aab..75088a5eee 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.api.service;
 
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
@@ -41,6 +42,9 @@ public interface StreamSinkApi {
     @POST("sink/save")
     Call<Response<Integer>> save(@Body SinkRequest request);
 
+    @POST("sink/batchSave")
+    Call<Response<List<BatchResult>>> batchSave(@Body List<SinkRequest> 
requestList);
+
     @POST("sink/update")
     Call<Response<Boolean>> updateById(@Body SinkRequest request);
 
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
index d383e30d86..31fb815aff 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.client.api.service;
 
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.source.SourcePageRequest;
@@ -31,11 +32,16 @@ import retrofit2.http.POST;
 import retrofit2.http.Path;
 import retrofit2.http.Query;
 
+import java.util.List;
+
 public interface StreamSourceApi {
 
     @POST("source/save")
     Call<Response<Integer>> createSource(@Body SourceRequest request);
 
+    @POST("source/batchSave")
+    Call<Response<List<BatchResult>>> batchCreateSource(@Body 
List<SourceRequest> requestList);
+
     @POST("source/update")
     Call<Response<Boolean>> updateSource(@Body SourceRequest request);
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
index 0215a694e9..ab39ad311d 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
@@ -35,5 +35,18 @@ public enum OperationType {
 
     GET,
 
-    LIST
+    LIST,
+
+    SUSPEND,
+
+    START;
+
+    public static OperationType forOperationType(String type) {
+        for (OperationType operationType : values()) {
+            if (operationType.name().equalsIgnoreCase(type)) {
+                return operationType;
+            }
+        }
+        throw new IllegalArgumentException(String.format("Unsupported 
operation type for %s", type));
+    }
 }
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java
similarity index 62%
copy from 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java
index 0215a694e9..140dfb09bc 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/OperationType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/BatchResult.java
@@ -15,25 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.enums;
+package org.apache.inlong.manager.pojo.common;
 
-/**
- * Operation type
- */
-public enum OperationType {
-
-    CREATE,
-
-    UPDATE,
+import org.apache.inlong.manager.common.enums.OperationTarget;
 
-    /**
-     * Insert or update
-     */
-    UPSERT,
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-    DELETE,
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class BatchResult {
 
-    GET,
+    // Unique identification of operation target
+    private String uniqueKey;
+    private boolean success;
+    private OperationTarget operationTarget;
+    private String errMsg;
 
-    LIST
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java
index 215fbb3051..059e8edb82 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/ProcessResponse.java
@@ -77,6 +77,6 @@ public class ProcessResponse {
     private List<TaskResponse> currentTasks;
 
     @ApiModelProperty(value = "Extra information shown in the list")
-    private Map<String, Object> showInList;
+    private List<Map<String, Object>> showInList;
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java
index 519d6a7c16..06cae85145 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/TaskResponse.java
@@ -89,6 +89,6 @@ public class TaskResponse {
     private Object extParams;
 
     @ApiModelProperty(value = "Extra information shown in the list")
-    private Map<String, Object> showInList;
+    private List<Map<String, Object>> showInList;
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
index 4b6792604c..10db1197fc 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
@@ -26,6 +26,8 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -56,12 +58,14 @@ public class ApplyConsumeProcessForm extends 
BaseProcessForm {
     }
 
     @Override
-    public Map<String, Object> showInList() {
+    public List<Map<String, Object>> showInList() {
+        List<Map<String, Object>> showInList = new ArrayList<>();
         Map<String, Object> show = Maps.newHashMap();
         if (consumeInfo != null) {
             show.put("inlongGroupId", consumeInfo.getInlongGroupId());
             show.put("consumerGroup", consumeInfo.getConsumerGroup());
         }
-        return show;
+        showInList.add(show);
+        return showInList;
     }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java
index c3b7005dd9..272052f514 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyGroupProcessForm.java
@@ -22,13 +22,20 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Apply inlong group process form
@@ -39,15 +46,19 @@ public class ApplyGroupProcessForm extends BaseProcessForm {
 
     public static final String FORM_NAME = "ApplyGroupProcessForm";
 
-    @ApiModelProperty(value = "Inlong group info", required = true)
+    @ApiModelProperty(value = "Inlong group info")
     private InlongGroupInfo groupInfo;
 
     @ApiModelProperty(value = "All inlong stream info under the inlong group, 
including the sink info")
     private List<InlongStreamBriefInfo> streamInfoList;
 
+    @ApiModelProperty(value = "Inlong group full info list")
+    private List<GroupFullInfo> groupFullInfoList;
+
     @Override
     public void validate() throws FormValidateException {
-        Preconditions.expectNotNull(groupInfo, "inlong group info is empty");
+        Preconditions.expectTrue(groupInfo != null || 
CollectionUtils.isNotEmpty(groupFullInfoList),
+                "inlong group info is empty");
     }
 
     @Override
@@ -57,14 +68,46 @@ public class ApplyGroupProcessForm extends BaseProcessForm {
 
     @Override
     public String getInlongGroupId() {
-        return groupInfo.getInlongGroupId();
+        if (groupInfo != null) {
+            return groupInfo.getInlongGroupId();
+        }
+        List<String> groupIdList = groupFullInfoList.stream().map(v -> {
+            InlongGroupInfo groupInfo = v.getGroupInfo();
+            return groupInfo.getInlongGroupId();
+        }).collect(Collectors.toList());
+        return Joiner.on(",").join(groupIdList);
     }
 
     @Override
-    public Map<String, Object> showInList() {
+    public List<Map<String, Object>> showInList() {
+        List<Map<String, Object>> showList = new ArrayList<>();
+        if (groupInfo != null) {
+            addShowInfo(groupInfo, showList);
+        }
+        if (CollectionUtils.isNotEmpty(groupFullInfoList)) {
+            groupFullInfoList.forEach(groupFullInfo -> {
+                addShowInfo(groupFullInfo.getGroupInfo(), showList);
+            });
+        }
+        return showList;
+    }
+
+    private void addShowInfo(InlongGroupInfo groupInfo, List<Map<String, 
Object>> showList) {
         Map<String, Object> show = Maps.newHashMap();
         show.put("inlongGroupId", groupInfo.getInlongGroupId());
         show.put("inlongGroupMode", groupInfo.getInlongGroupMode());
-        return show;
+        showList.add(show);
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class GroupFullInfo {
+
+        private InlongGroupInfo groupInfo;
+
+        private List<InlongStreamBriefInfo> streamInfoList;
+
     }
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java
index d955a2c5a6..1dd1294653 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/GroupResourceProcessForm.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,11 +62,13 @@ public class GroupResourceProcessForm extends 
BaseProcessForm {
     }
 
     @Override
-    public Map<String, Object> showInList() {
+    public List<Map<String, Object>> showInList() {
+        List<Map<String, Object>> showInList = new ArrayList<>();
         Map<String, Object> show = new HashMap<>();
         show.put("inlongGroupId", groupInfo.getInlongGroupId());
         show.put("groupOperateType", this.groupOperateType);
-        return show;
+        showInList.add(show);
+        return showInList;
     }
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java
index 95f4d1451c..bcf1278e97 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ProcessForm.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.pojo.workflow.form.Form;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -47,7 +48,7 @@ public interface ProcessForm extends Form {
     /**
      * Field data displayed in the process list.
      */
-    default Map<String, Object> showInList() {
+    default List<Map<String, Object>> showInList() {
         return null;
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java
index bcbcc571c0..e7fbbc37ff 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/InlongGroupApproveForm.java
@@ -22,10 +22,16 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamApproveRequest;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -37,15 +43,19 @@ public class InlongGroupApproveForm extends BaseTaskForm {
 
     public static final String FORM_NAME = "InlongGroupApproveForm";
 
-    @ApiModelProperty(value = "Inlong group approve info", required = true)
+    @ApiModelProperty(value = "Inlong group approve info")
     private InlongGroupApproveRequest groupApproveInfo;
 
     @ApiModelProperty(value = "All inlong stream info under the inlong group, 
including the sink info")
     private List<InlongStreamApproveRequest> streamApproveInfoList;
 
+    @ApiModelProperty(value = "Inlong group approve full info list")
+    private List<GroupApproveFullRequest> groupApproveFullInfoList;
+
     @Override
     public void validate() throws FormValidateException {
-        Preconditions.expectNotNull(groupApproveInfo, "inlong group approve 
info is empty");
+        Preconditions.expectTrue(groupApproveInfo != null || 
CollectionUtils.isNotEmpty(groupApproveFullInfoList),
+                "inlong group approve info is empty");
     }
 
     @Override
@@ -53,4 +63,28 @@ public class InlongGroupApproveForm extends BaseTaskForm {
         return FORM_NAME;
     }
 
+    @JsonIgnore
+    public List<GroupApproveFullRequest> getApproveFullRequest() {
+        List<GroupApproveFullRequest> result = new ArrayList<>();
+        if (groupApproveInfo != null) {
+            result.add(new GroupApproveFullRequest(groupApproveInfo, 
streamApproveInfoList));
+        }
+        if (CollectionUtils.isNotEmpty(groupApproveFullInfoList)) {
+            result.addAll(groupApproveFullInfoList);
+        }
+        return result;
+    }
+
+    @Data
+    @Builder
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class GroupApproveFullRequest {
+
+        private InlongGroupApproveRequest groupApproveInfo;
+
+        private List<InlongStreamApproveRequest> streamApproveInfoList;
+
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
index cb1cededdb..0a1118706c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/datatype/CsvDataTypeOperator.java
@@ -46,11 +46,10 @@ public class CsvDataTypeOperator implements 
DataTypeOperator {
                 separator = (char) 
Integer.parseInt(streamInfo.getDataSeparator());
             }
             String[] bodys = StringUtils.split(str, separator);
-            if (bodys.length != fields.size()) {
-                return fields;
-            }
             for (int i = 0; i < bodys.length; i++) {
-                fields.get(i).setFieldValue(bodys[i]);
+                if (i < fields.size()) {
+                    fields.get(i).setFieldValue(bodys[i]);
+                }
             }
         } catch (Exception e) {
             log.warn("parse fields failed for groupId = {}, streamId = {}", 
streamInfo.getInlongGroupId(),
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index dbb78584b7..8052315d1a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -41,6 +41,7 @@ import org.apache.inlong.manager.pojo.workflow.ProcessRequest;
 import org.apache.inlong.manager.pojo.workflow.TaskResponse;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm;
+import 
org.apache.inlong.manager.pojo.workflow.form.process.ApplyGroupProcessForm.GroupFullInfo;
 import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
@@ -52,6 +53,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
@@ -116,6 +118,24 @@ public class InlongGroupProcessService {
         return result;
     }
 
+    public WorkflowResult batchStartProcess(List<String> groupIdList, String 
operator) {
+        for (String groupId : groupIdList) {
+            LOGGER.info("begin to start approve process for groupId={} by 
operator={}", groupId, operator);
+
+            groupService.updateStatus(groupId, 
GroupStatus.TO_BE_APPROVAL.getCode(), operator);
+        }
+        ApplyGroupProcessForm form = genApplyGroupProcessForm(groupIdList);
+        WorkflowResult result = 
workflowService.start(ProcessName.APPLY_GROUP_PROCESS, operator, form);
+        List<TaskResponse> tasks = result.getNewTasks();
+        if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+            throw new 
BusinessException(ErrorCodeEnum.WORKFLOW_START_RECORD_FAILED,
+                    String.format("failed to start inlong group for 
groupId=%s", groupIdList));
+        }
+
+        LOGGER.info("success to start approve process for groupId={} by 
operator={}", groupIdList, operator);
+        return result;
+    }
+
     /**
      * Suspend InlongGroup in an asynchronous way.
      *
@@ -368,6 +388,21 @@ public class InlongGroupProcessService {
         return form;
     }
 
+    private ApplyGroupProcessForm genApplyGroupProcessForm(List<String> 
groupIdList) {
+        ApplyGroupProcessForm form = new ApplyGroupProcessForm();
+        List<GroupFullInfo> groupFullInfoList = new ArrayList<>();
+        for (String groupId : groupIdList) {
+            InlongGroupInfo groupInfo = groupService.get(groupId);
+            List<InlongStreamBriefInfo> infoList = 
streamService.listBriefWithSink(groupInfo.getInlongGroupId());
+            GroupFullInfo groupFullInfo = new GroupFullInfo();
+            groupFullInfo.setGroupInfo(groupInfo);
+            groupFullInfo.setStreamInfoList(infoList);
+            groupFullInfoList.add(groupFullInfo);
+        }
+        form.setGroupFullInfoList(groupFullInfoList);
+        return form;
+    }
+
     /**
      * Generate the form of [Group Resource Workflow]
      */
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
index d2f8b09048..959f46edf1 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.group;
 
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
@@ -60,6 +61,16 @@ public interface InlongGroupService {
     String save(@Valid @NotNull(message = "inlong group request cannot be 
null") InlongGroupRequest groupInfo,
             UserInfo opInfo);
 
+    /**
+     * Batch save inlong group info.
+     *
+     * @param groupRequestList group request list need to save
+     * @param operator name of operator
+     * @return inlong group id list after saving
+     */
+    List<BatchResult> batchSave(
+            @Valid @NotNull(message = "inlong group request list cannot be 
null") List<InlongGroupRequest> groupRequestList,
+            String operator);
     /**
      * Query whether the specified group id exists
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 2d87b74995..9c4f85baac 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -23,6 +23,7 @@ import 
org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -42,6 +43,7 @@ import 
org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.dao.mapper.TenantClusterTagEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -216,6 +218,28 @@ public class InlongGroupServiceImpl implements 
InlongGroupService {
         return groupId;
     }
 
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public List<BatchResult> batchSave(List<InlongGroupRequest> 
groupRequestList, String operator) {
+        List<BatchResult> resultList = new ArrayList<>();
+        for (InlongGroupRequest groupRequest : groupRequestList) {
+            BatchResult result = BatchResult.builder()
+                    .uniqueKey(groupRequest.getInlongGroupId())
+                    .operationTarget(OperationTarget.GROUP)
+                    .build();
+            try {
+                this.save(groupRequest, operator);
+                result.setSuccess(true);
+            } catch (Exception e) {
+                LOGGER.error("failed to save inlong group for groupId={}", 
groupRequest.getInlongGroupId(), e);
+                result.setSuccess(false);
+                result.setErrMsg(e.getMessage());
+            }
+            resultList.add(result);
+        }
+        return resultList;
+    }
+
     @Override
     public Boolean exist(String groupId) {
         Preconditions.expectNotNull(groupId, 
ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java
index 819f455e59..63e671f33c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/AfterApprovedTaskListener.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.pojo.group.InlongGroupApproveRequest;
 import 
org.apache.inlong.manager.pojo.workflow.form.task.InlongGroupApproveForm;
+import 
org.apache.inlong.manager.pojo.workflow.form.task.InlongGroupApproveForm.GroupApproveFullRequest;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -32,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * The listener for modifying InlongGroup info after the application 
InlongGroup process is approved.
  */
@@ -57,15 +60,18 @@ public class AfterApprovedTaskListener implements 
TaskEventListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws 
WorkflowListenerException {
         InlongGroupApproveForm form = (InlongGroupApproveForm) 
context.getActionContext().getForm();
-        InlongGroupApproveRequest approveInfo = form.getGroupApproveInfo();
-        String groupId = approveInfo.getInlongGroupId();
-        log.info("begin to execute AfterApprovedTaskListener for groupId={}", 
groupId);
+        List<GroupApproveFullRequest> approveRequestList = 
form.getApproveFullRequest();
+        for (GroupApproveFullRequest request : approveRequestList) {
+            InlongGroupApproveRequest approveInfo = 
request.getGroupApproveInfo();
+            String groupId = approveInfo.getInlongGroupId();
+            log.info("begin to execute AfterApprovedTaskListener for 
groupId={}", groupId);
 
-        // save the inlong group and other info after approval
-        groupService.updateAfterApprove(approveInfo, context.getOperator());
-        streamService.updateAfterApprove(form.getStreamApproveInfoList(), 
context.getOperator());
+            // save the inlong group and other info after approval
+            groupService.updateAfterApprove(approveInfo, 
context.getOperator());
+            
streamService.updateAfterApprove(request.getStreamApproveInfoList(), 
context.getOperator());
 
-        log.info("success to execute AfterApprovedTaskListener for 
groupId={}", groupId);
+            log.info("success to execute AfterApprovedTaskListener for 
groupId={}", groupId);
+        }
         return ListenerResult.success();
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
index 4d285488f6..f74153f6a8 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/ApproveApplyProcessListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.listener.group.apply;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -37,6 +38,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Arrays;
 import java.util.List;
 
 /**
@@ -63,21 +65,22 @@ public class ApproveApplyProcessListener implements 
ProcessEventListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws 
WorkflowListenerException {
         ApplyGroupProcessForm form = (ApplyGroupProcessForm) 
context.getProcessForm();
-        String groupId = form.getInlongGroupId();
-        log.info("begin to execute ApproveApplyProcessListener for 
groupId={}", groupId);
+        List<String> groupList = 
Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA));
+        for (String groupId : groupList) {
+            log.info("begin to execute ApproveApplyProcessListener for 
groupId={}", groupId);
 
-        InlongGroupInfo groupInfo = groupService.get(groupId);
-        GroupResourceProcessForm processForm = new GroupResourceProcessForm();
-        processForm.setGroupInfo(groupInfo);
-        String username = context.getOperator();
-        List<InlongStreamInfo> streamList = streamService.list(groupId);
-        processForm.setStreamInfos(streamList);
+            InlongGroupInfo groupInfo = groupService.get(groupId);
+            GroupResourceProcessForm processForm = new 
GroupResourceProcessForm();
+            processForm.setGroupInfo(groupInfo);
+            List<InlongStreamInfo> streamList = streamService.list(groupId);
+            processForm.setStreamInfos(streamList);
 
-        // may run for long time, make it async processing
-        UserInfo userInfo = LoginUserUtils.getLoginUser();
-        EXECUTOR_SERVICE.execute(
-                () -> 
workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, 
processForm));
-        log.info("success to execute ApproveApplyProcessListener for 
groupId={}", groupId);
+            // may run for long time, make it async processing
+            UserInfo userInfo = LoginUserUtils.getLoginUser();
+            EXECUTOR_SERVICE.execute(
+                    () -> 
workflowService.startAsync(ProcessName.CREATE_GROUP_RESOURCE, userInfo, 
processForm));
+            log.info("success to execute ApproveApplyProcessListener for 
groupId={}", groupId);
+        }
         return ListenerResult.success();
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java
index c6b2871ba8..5f03ebbf0f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/CancelApplyProcessListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.listener.group.apply;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -31,6 +32,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -54,22 +57,24 @@ public class CancelApplyProcessListener implements 
ProcessEventListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws 
WorkflowListenerException {
         ApplyGroupProcessForm form = (ApplyGroupProcessForm) 
context.getProcessForm();
-        String groupId = form.getInlongGroupId();
-        log.info("begin to execute CancelApplyProcessListener for groupId={}", 
groupId);
+        List<String> groupList = 
Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA));
+        for (String groupId : groupList) {
+            log.info("begin to execute CancelApplyProcessListener for 
groupId={}", groupId);
 
-        // only the [ToBeApproval] status allowed the canceling operation
-        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
-        if (entity == null) {
-            throw new WorkflowListenerException("InlongGroup not found with 
groupId=" + groupId);
-        }
-        if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), 
entity.getStatus())) {
-            throw new WorkflowListenerException(String.format("Current status 
[%s] was not allowed to cancel",
-                    GroupStatus.forCode(entity.getStatus())));
-        }
-        String operator = context.getOperator();
-        groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), 
operator);
+            // only the [ToBeApproval] status allowed the canceling operation
+            InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+            if (entity == null) {
+                throw new WorkflowListenerException("InlongGroup not found 
with groupId=" + groupId);
+            }
+            if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), 
entity.getStatus())) {
+                throw new WorkflowListenerException(String.format("Current 
status [%s] was not allowed to cancel",
+                        GroupStatus.forCode(entity.getStatus())));
+            }
+            String operator = context.getOperator();
+            groupMapper.updateStatus(groupId, 
GroupStatus.TO_BE_SUBMIT.getCode(), operator);
 
-        log.info("success to execute CancelApplyProcessListener for 
groupId={}", groupId);
+            log.info("success to execute CancelApplyProcessListener for 
groupId={}", groupId);
+        }
         return ListenerResult.success();
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java
index 4b387e6da9..8ad8bd766a 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/apply/RejectApplyProcessListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.listener.group.apply;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -32,6 +33,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -54,21 +57,23 @@ public class RejectApplyProcessListener implements 
ProcessEventListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws 
WorkflowListenerException {
         ApplyGroupProcessForm form = (ApplyGroupProcessForm) 
context.getProcessForm();
-        String groupId = form.getInlongGroupId();
-        log.info("begin to execute RejectApplyProcessListener for groupId={}", 
groupId);
+        List<String> groupList = 
Arrays.asList(form.getInlongGroupId().split(InlongConstants.COMMA));
+        for (String groupId : groupList) {
+            log.info("begin to execute RejectApplyProcessListener for 
groupId={}", groupId);
 
-        // only the [TO_BE_APPROVAL] status allowed the rejecting operation
-        InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
-        if (entity == null) {
-            throw new WorkflowListenerException("inlong group not found with 
groupId=" + groupId);
-        }
-        if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), 
entity.getStatus())) {
-            throw new WorkflowListenerException("current status was not 
allowed to reject inlong group");
-        }
+            // only the [TO_BE_APPROVAL] status allowed the rejecting operation
+            InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
+            if (entity == null) {
+                throw new WorkflowListenerException("inlong group not found 
with groupId=" + groupId);
+            }
+            if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), 
entity.getStatus())) {
+                throw new WorkflowListenerException("current status was not 
allowed to reject inlong group");
+            }
 
-        // after reject, update InlongGroup status to [APPROVE_REJECTED]
-        String username = context.getOperator();
-        groupService.updateStatus(groupId, 
GroupStatus.APPROVE_REJECTED.getCode(), username);
+            // after reject, update InlongGroup status to [APPROVE_REJECTED]
+            String username = context.getOperator();
+            groupService.updateStatus(groupId, 
GroupStatus.APPROVE_REJECTED.getCode(), username);
+        }
         return ListenerResult.success();
     }
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index 06f97bb762..79da89d512 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.sink;
 
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -59,6 +60,15 @@ public interface StreamSinkService {
      */
     Integer save(SinkRequest request, UserInfo opInfo);
 
+    /**
+     * Batch save the sink info.
+     *
+     * @param requestList sink request list need to save
+     * @param operator name of operator
+     * @return sink id list after saving
+     */
+    List<BatchResult> batchSave(List<SinkRequest> requestList, String 
operator);
+
     /**
      * Get stream sink info based on id.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 31092bdb0b..4d7fd556ac 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.sink;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
@@ -34,6 +35,7 @@ import 
org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -234,6 +236,29 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         return id;
     }
 
+    @Override
+    public List<BatchResult> batchSave(List<SinkRequest> requestList, String 
operator) {
+        List<BatchResult> resultList = new ArrayList<>();
+        for (SinkRequest request : requestList) {
+            BatchResult result = BatchResult.builder()
+                    .uniqueKey(request.getInlongGroupId() + "-" + 
request.getInlongStreamId() + "-"
+                            + request.getSinkName())
+                    .operationTarget(OperationTarget.SINK)
+                    .build();
+            try {
+                this.save(request, operator);
+                result.setSuccess(true);
+            } catch (Exception e) {
+                LOGGER.error("failed to save save source info for sinkName={}, 
groupId={}, streamId={}",
+                        request.getSinkName(), request.getInlongGroupId(), 
request.getInlongStreamId(), e);
+                result.setSuccess(false);
+                result.setErrMsg(e.getMessage());
+            }
+            resultList.add(result);
+        }
+        return resultList;
+    }
+
     @Override
     public StreamSink get(Integer id) {
         if (id == null) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
index 0dd4decd28..879ca5e012 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.source;
 
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
@@ -52,6 +53,15 @@ public interface StreamSourceService {
      */
     Integer save(SourceRequest request, UserInfo opInfo);
 
+    /**
+     * Batch save the source information
+     *
+     * @param requestList Source request list.
+     * @param operator Operator's name.
+     * @return source id list after saving.
+     */
+    List<BatchResult> batchSave(List<SourceRequest> requestList, String 
operator);
+
     /**
      * Query source information based on id
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 2d3855b05e..c252a6a00c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -33,6 +34,7 @@ import 
org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -162,6 +164,29 @@ public class StreamSourceServiceImpl implements 
StreamSourceService {
         return sourceOperator.saveOpt(request, groupEntity.getStatus(), 
opInfo.getName());
     }
 
+    @Override
+    public List<BatchResult> batchSave(List<SourceRequest> requestList, String 
operator) {
+        List<BatchResult> resultList = new ArrayList<>();
+        for (SourceRequest request : requestList) {
+            BatchResult result = BatchResult.builder()
+                    .uniqueKey(request.getInlongGroupId() + "-" + 
request.getInlongStreamId() + "-"
+                            + request.getSourceName())
+                    .operationTarget(OperationTarget.SOURCE)
+                    .build();
+            try {
+                this.save(request, operator);
+                result.setSuccess(true);
+            } catch (Exception e) {
+                LOGGER.error("failed to save save source info for 
sourceName={}, groupId={}, streamId={}",
+                        request.getSourceName(), request.getInlongGroupId(), 
request.getInlongStreamId(), e);
+                result.setSuccess(false);
+                result.setErrMsg(e.getMessage());
+            }
+            resultList.add(result);
+        }
+        return resultList;
+    }
+
     @Override
     public StreamSource get(Integer id) {
         if (id == null) {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 328e9ae73c..e54ff475b5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.stream;
 
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
 import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
@@ -50,6 +51,15 @@ public interface InlongStreamService {
      */
     Integer save(InlongStreamRequest request, String operator);
 
+    /**
+     * Batch save inlong stream information.
+     *
+     * @param requestList Inlong stream information list.
+     * @param operator The name of operator.
+     * @return Id list after successful save.
+     */
+    List<BatchResult> batchSave(List<InlongStreamRequest> requestList, String 
operator);
+
     /**
      * Save inlong stream information.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index b9d8ef8d36..21df28228f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.stream;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.tool.excel.ExcelTool;
@@ -35,6 +36,7 @@ import 
org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamExtEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamFieldEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
@@ -184,6 +186,28 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
         return streamEntity.getId();
     }
 
+    @Override
+    public List<BatchResult> batchSave(List<InlongStreamRequest> requestList, 
String operator) {
+        List<BatchResult> resultList = new ArrayList<>();
+        for (InlongStreamRequest request : requestList) {
+            BatchResult result = BatchResult.builder()
+                    .uniqueKey(request.getInlongGroupId() + "-" + 
request.getInlongStreamId())
+                    .operationTarget(OperationTarget.STREAM)
+                    .build();
+            try {
+                this.save(request, operator);
+                result.setSuccess(true);
+            } catch (Exception e) {
+                LOGGER.error("failed to save inlong stream for groupId={}, 
streamId={}", request.getInlongGroupId(),
+                        request.getInlongStreamId(), e);
+                result.setSuccess(false);
+                result.setErrMsg(e.getMessage());
+            }
+            resultList.add(result);
+        }
+        return resultList;
+    }
+
     @Override
     public Integer save(InlongStreamRequest request, UserInfo opInfo) {
         InlongGroupEntity entity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
index 0863b8e666..739b68fcab 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImpl.java
@@ -244,7 +244,7 @@ public class WorkflowServiceImpl implements WorkflowService 
{
         };
     }
 
-    private Map<String, Object> getShowInList(WorkflowProcessEntity 
processEntity) {
+    private List<Map<String, Object>> getShowInList(WorkflowProcessEntity 
processEntity) {
         WorkflowProcess process = 
processDefService.getByName(processEntity.getName());
         if (process == null || process.getFormClass() == null) {
             return null;
@@ -270,7 +270,7 @@ public class WorkflowServiceImpl implements WorkflowService 
{
         query.setIdList(list);
 
         List<WorkflowProcessEntity> processEntities = 
queryService.listProcessEntity(query);
-        Map<Integer, Map<String, Object>> processShowInListMap = 
Maps.newHashMap();
+        Map<Integer, List<Map<String, Object>>> processShowInListMap = 
Maps.newHashMap();
         processEntities.forEach(entity -> 
processShowInListMap.put(entity.getId(), getShowInList(entity)));
         taskList.forEach(task -> 
task.setShowInList(processShowInListMap.get(task.getProcessId())));
     }
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
index 0ad34ae506..da69535631 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongGroupController.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
 import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
@@ -77,6 +78,15 @@ public class InlongGroupController {
         return Response.success(groupService.save(groupRequest, operator));
     }
 
+    @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.GROUP)
+    @ApiOperation(value = "Batch Save inlong group")
+    public Response<List<BatchResult>> batchSave(
+            @Validated(SaveValidation.class) @RequestBody 
List<InlongGroupRequest> groupRequestList) {
+        String operator = LoginUserUtils.getLoginUser().getName();
+        return Response.success(groupService.batchSave(groupRequestList, 
operator));
+    }
+
     @RequestMapping(value = "/group/exist/{groupId}", method = 
RequestMethod.GET)
     @ApiOperation(value = "Is the inlong group id exists")
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class, required = true)
@@ -155,14 +165,24 @@ public class InlongGroupController {
 
     @RequestMapping(value = "/group/startProcess/{groupId}", method = 
RequestMethod.POST)
     @ApiOperation(value = "Start inlong approval process")
+    @OperationLog(operation = OperationType.START, operationTarget = 
OperationTarget.GROUP)
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class)
     public Response<WorkflowResult> startProcess(@PathVariable String groupId) 
{
         String operator = LoginUserUtils.getLoginUser().getName();
         return Response.success(groupProcessOperation.startProcess(groupId, 
operator));
     }
 
+    @RequestMapping(value = "/group/batchStartProcess", method = 
RequestMethod.POST)
+    @ApiOperation(value = "Batch start inlong approval process")
+    @OperationLog(operation = OperationType.START, operationTarget = 
OperationTarget.GROUP)
+    public Response<WorkflowResult> batchStartProcess(@RequestBody 
List<String> groupIdList) {
+        String operator = LoginUserUtils.getLoginUser().getName();
+        return 
Response.success(groupProcessOperation.batchStartProcess(groupIdList, 
operator));
+    }
+
     @RequestMapping(value = "/group/suspendProcess/{groupId}", method = 
RequestMethod.POST)
     @ApiOperation(value = "Suspend inlong group process")
+    @OperationLog(operation = OperationType.SUSPEND, operationTarget = 
OperationTarget.GROUP)
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class)
     public Response<WorkflowResult> suspendProcess(@PathVariable String 
groupId) {
         String operator = LoginUserUtils.getLoginUser().getName();
@@ -178,6 +198,7 @@ public class InlongGroupController {
     }
 
     @RequestMapping(value = "/group/suspendProcessAsync/{groupId}", method = 
RequestMethod.POST)
+    @OperationLog(operation = OperationType.SUSPEND, operationTarget = 
OperationTarget.GROUP)
     @ApiOperation(value = "Suspend inlong group process")
     @ApiImplicitParam(name = "groupId", value = "Inlong group id", 
dataTypeClass = String.class)
     public Response<String> suspendProcessAsync(@PathVariable String groupId) {
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
index da747818ce..c8c6c51186 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongStreamController.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.tool.excel.ExcelTool;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
@@ -86,6 +87,14 @@ public class InlongStreamController {
         return Response.success(result);
     }
 
+    @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.STREAM)
+    @ApiOperation(value = "Batch save inlong stream")
+    public Response<List<BatchResult>> batchSave(@RequestBody 
List<InlongStreamRequest> requestList) {
+        List<BatchResult> result = streamService.batchSave(requestList, 
LoginUserUtils.getLoginUser().getName());
+        return Response.success(result);
+    }
+
     @RequestMapping(value = "/stream/exist/{groupId}/{streamId}", method = 
RequestMethod.GET)
     @ApiOperation(value = "Is the inlong stream exists")
     @ApiImplicitParams({
@@ -154,6 +163,7 @@ public class InlongStreamController {
     }
 
     @RequestMapping(value = "/stream/suspendProcess/{groupId}/{streamId}", 
method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.SUSPEND, operationTarget = 
OperationTarget.STREAM)
     @ApiOperation(value = "Suspend inlong stream process")
     @ApiImplicitParams({
             @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, 
required = true),
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index e8d0ef93ab..7869758cdb 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
 import org.apache.inlong.manager.common.validation.UpdateByKeyValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
@@ -66,6 +67,13 @@ public class StreamSinkController {
         return Response.success(sinkService.save(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
+    @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.SINK)
+    @ApiOperation(value = "Batch save stream sink")
+    public Response<List<BatchResult>> batchSave(@Validated @RequestBody 
List<SinkRequest> requestList) {
+        return Response.success(sinkService.batchSave(requestList, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
     @RequestMapping(value = "/sink/get/{id}", method = RequestMethod.GET)
     @ApiOperation(value = "Get stream sink")
     @OperationLog(operation = OperationType.GET, operationTarget = 
OperationTarget.SINK)
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
index 8e7645f993..e410af5c1e 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java
@@ -21,6 +21,7 @@ import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.source.DataAddTaskRequest;
@@ -44,6 +45,8 @@ import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.List;
+
 /**
  * Stream source control layer
  */
@@ -62,6 +65,14 @@ public class StreamSourceController {
         return Response.success(sourceService.save(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
+    @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.SOURCE)
+    @ApiOperation(value = "Batch save stream source")
+    public Response<List<BatchResult>> batchSave(
+            @Validated(SaveValidation.class) @RequestBody List<SourceRequest> 
requestList) {
+        return Response.success(sourceService.batchSave(requestList, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
     @RequestMapping(value = "/source/get/{id}", method = RequestMethod.GET)
     @ApiOperation(value = "Get stream source")
     @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
index 8dca1ad748..3ee366910d 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongGroupController.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
@@ -88,6 +89,15 @@ public class OpenInLongGroupController {
         return Response.success(groupService.save(groupRequest, 
LoginUserUtils.getLoginUser()));
     }
 
+    @RequestMapping(value = "/group/batchSave", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.GROUP)
+    @ApiOperation(value = "Batch Save inlong group")
+    public Response<List<BatchResult>> batchSave(
+            @Validated(SaveValidation.class) @RequestBody 
List<InlongGroupRequest> groupRequestList) {
+        String operator = LoginUserUtils.getLoginUser().getName();
+        return Response.success(groupService.batchSave(groupRequestList, 
operator));
+    }
+
     @RequestMapping(value = "/group/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.GROUP)
     @ApiOperation(value = "Update inlong group")
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
index 893e0e589b..903e730280 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenInLongStreamController.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.sink.AddFieldRequest;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
@@ -105,6 +106,14 @@ public class OpenInLongStreamController {
         return Response.success(streamService.save(request, 
LoginUserUtils.getLoginUser()));
     }
 
+    @RequestMapping(value = "/stream/batchSave", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.STREAM)
+    @ApiOperation(value = "Batch save inlong stream")
+    public Response<List<BatchResult>> batchSave(@RequestBody 
List<InlongStreamRequest> requestList) {
+        List<BatchResult> result = streamService.batchSave(requestList, 
LoginUserUtils.getLoginUser().getName());
+        return Response.success(result);
+    }
+
     @RequestMapping(value = "/stream/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.STREAM)
     @ApiOperation(value = "Update inlong stream")
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
index d69f0895f4..1d570033df 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSinkController.java
@@ -22,6 +22,7 @@ import org.apache.inlong.manager.common.enums.OperationTarget;
 import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.UpdateByIdValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.sink.SinkPageRequest;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
@@ -83,6 +84,13 @@ public class OpenStreamSinkController {
         return Response.success(sinkService.save(request, 
LoginUserUtils.getLoginUser()));
     }
 
+    @RequestMapping(value = "/sink/batchSave", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.SINK)
+    @ApiOperation(value = "Batch save stream sink")
+    public Response<List<BatchResult>> batchSave(@Validated @RequestBody 
List<SinkRequest> requestList) {
+        return Response.success(sinkService.batchSave(requestList, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
     @RequestMapping(value = "/sink/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.SINK)
     @ApiOperation(value = "Update stream sink")
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
index 6bd7ab19dd..64e98a58bc 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenStreamSourceController.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.common.enums.OperationType;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.common.validation.SaveValidation;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.Response;
 import org.apache.inlong.manager.pojo.source.SourcePageRequest;
@@ -43,6 +44,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.List;
+
 /**
  * Open InLong Stream Source controller
  */
@@ -80,6 +83,14 @@ public class OpenStreamSourceController {
         return Response.success(sourceService.save(request, 
LoginUserUtils.getLoginUser()));
     }
 
+    @RequestMapping(value = "/source/batchSave", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE, operationTarget = 
OperationTarget.SOURCE)
+    @ApiOperation(value = "Batch save stream source")
+    public Response<List<BatchResult>> batchSave(
+            @Validated(SaveValidation.class) @RequestBody List<SourceRequest> 
requestList) {
+        return Response.success(sourceService.batchSave(requestList, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
     @RequestMapping(value = "/source/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.SOURCE)
     @ApiOperation(value = "Update stream source")

Reply via email to