This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b2284808f [INLONG-5020][Manager] Refactoring InnerInlongManagerClient 
of manager (#5021)
b2284808f is described below

commit b2284808fa77957425d288b0b3be3b5f2d558e33
Author: haifxu <[email protected]>
AuthorDate: Fri Jul 15 14:11:14 2022 +0800

    [INLONG-5020][Manager] Refactoring InnerInlongManagerClient of manager 
(#5021)
---
 .../inlong/manager/client/cli/DescribeCommand.java |  64 +--
 .../inlong/manager/client/cli/ListCommand.java     |  66 +--
 .../manager/client/cli/util/ClientUtils.java       |  69 +--
 .../api/impl/DefaultInlongStreamBuilder.java       |  62 ++-
 .../manager/client/api/impl/InlongClientImpl.java  |  22 +-
 .../manager/client/api/impl/InlongGroupImpl.java   |  75 +--
 .../manager/client/api/impl/InlongStreamImpl.java  |  67 ++-
 .../client/api/impl/LowLevelInlongClientImpl.java  |  16 +-
 .../client/api/inner/InnerInlongManagerClient.java | 580 ---------------------
 .../client/api/inner/client/ClientFactory.java     |  52 ++
 .../api/inner/client/InlongClusterClient.java      |  52 ++
 .../client/api/inner/client/InlongGroupClient.java | 219 ++++++++
 .../api/inner/client/InlongStreamClient.java       | 112 ++++
 .../client/api/inner/client/StreamSinkClient.java  |  98 ++++
 .../api/inner/client/StreamSourceClient.java       |  90 ++++
 .../api/inner/client/StreamTransformClient.java    |  88 ++++
 .../client/api/inner/client/WorkflowClient.java    |  83 +++
 .../manager/client/api/util/ClientUtils.java       | 121 +++++
 ...nagerClientTest.java => ClientFactoryTest.java} |  62 ++-
 19 files changed, 1181 insertions(+), 817 deletions(-)

diff --git 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
index 461f2c2cc..38edd6d3e 100644
--- 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
+++ 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
@@ -20,8 +20,10 @@ package org.apache.inlong.manager.client.cli;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
 import org.apache.inlong.manager.client.cli.pojo.GroupInfo;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
 import org.apache.inlong.manager.client.cli.util.PrintUtils;
@@ -31,7 +33,6 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
@@ -45,41 +46,28 @@ public class DescribeCommand extends AbstractCommand {
 
     public DescribeCommand() {
         super("describe");
-        InlongClientImpl inlongClient;
-        try {
-            inlongClient = ClientUtils.getClient();
-        } catch (IOException e) {
-            System.err.println("get inlong client error");
-            System.err.println(e.getMessage());
-            return;
-        }
-        InnerInlongManagerClient managerClient = new 
InnerInlongManagerClient(inlongClient.getConfiguration());
 
-        jcommander.addCommand("stream", new DescribeStream(managerClient));
-        jcommander.addCommand("group", new DescribeGroup(managerClient));
-        jcommander.addCommand("sink", new DescribeSink(managerClient));
-        jcommander.addCommand("source", new DescribeSource(managerClient));
+        jcommander.addCommand("stream", new DescribeStream());
+        jcommander.addCommand("group", new DescribeGroup());
+        jcommander.addCommand("sink", new DescribeSink());
+        jcommander.addCommand("source", new DescribeSource());
     }
 
     @Parameters(commandDescription = "Get stream details")
     private static class DescribeStream extends AbstractCommandRunner {
 
-        private final InnerInlongManagerClient managerClient;
-
         @Parameter()
         private java.util.List<String> params;
 
         @Parameter(names = {"-g", "--group"}, required = true, description = 
"inlong group id")
         private String groupId;
 
-        DescribeStream(InnerInlongManagerClient managerClient) {
-            this.managerClient = managerClient;
-        }
-
         @Override
         void run() {
             try {
-                List<InlongStreamInfo> streamInfos = 
managerClient.listStreamInfo(groupId);
+                ClientUtils.initClientFactory();
+                InlongStreamClient streamClient = 
ClientUtils.clientFactory.getStreamClient();
+                List<InlongStreamInfo> streamInfos = 
streamClient.listStreamInfo(groupId);
                 streamInfos.forEach(PrintUtils::printJson);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
@@ -90,8 +78,6 @@ public class DescribeCommand extends AbstractCommand {
     @Parameters(commandDescription = "Get group details")
     private static class DescribeGroup extends AbstractCommandRunner {
 
-        private final InnerInlongManagerClient managerClient;
-
         @Parameter()
         private java.util.List<String> params;
 
@@ -104,16 +90,14 @@ public class DescribeCommand extends AbstractCommand {
         @Parameter(names = {"-n", "--num"}, description = "the number 
displayed")
         private int pageSize;
 
-        DescribeGroup(InnerInlongManagerClient managerClient) {
-            this.managerClient = managerClient;
-        }
-
         @Override
         void run() {
             try {
+                ClientUtils.initClientFactory();
+                InlongGroupClient groupClient = 
ClientUtils.clientFactory.getGroupClient();
                 InlongGroupPageRequest pageRequest = new 
InlongGroupPageRequest();
                 pageRequest.setKeyword(group);
-                PageInfo<InlongGroupListResponse> pageInfo = 
managerClient.listGroups(pageRequest);
+                PageInfo<InlongGroupListResponse> pageInfo = 
groupClient.listGroups(pageRequest);
                 PrintUtils.print(pageInfo.getList(), GroupInfo.class);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
@@ -124,8 +108,6 @@ public class DescribeCommand extends AbstractCommand {
     @Parameters(commandDescription = "Get sink details")
     private static class DescribeSink extends AbstractCommandRunner {
 
-        private final InnerInlongManagerClient managerClient;
-
         @Parameter()
         private java.util.List<String> params;
 
@@ -135,14 +117,12 @@ public class DescribeCommand extends AbstractCommand {
         @Parameter(names = {"-g", "--group"}, required = true, description = 
"inlong group id")
         private String group;
 
-        DescribeSink(InnerInlongManagerClient managerClient) {
-            this.managerClient = managerClient;
-        }
-
         @Override
         void run() {
             try {
-                List<StreamSink> streamSinks = managerClient.listSinks(group, 
stream);
+                ClientUtils.initClientFactory();
+                StreamSinkClient sinkClient = 
ClientUtils.clientFactory.getSinkClient();
+                List<StreamSink> streamSinks = sinkClient.listSinks(group, 
stream);
                 streamSinks.forEach(PrintUtils::printJson);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
@@ -153,8 +133,6 @@ public class DescribeCommand extends AbstractCommand {
     @Parameters(commandDescription = "Get source details")
     private static class DescribeSource extends AbstractCommandRunner {
 
-        private final InnerInlongManagerClient managerClient;
-
         @Parameter()
         private java.util.List<String> params;
 
@@ -167,14 +145,12 @@ public class DescribeCommand extends AbstractCommand {
         @Parameter(names = {"-t", "--type"}, description = "sink type")
         private String type;
 
-        DescribeSource(InnerInlongManagerClient managerClient) {
-            this.managerClient = managerClient;
-        }
-
         @Override
         void run() {
             try {
-                List<StreamSource> sources = managerClient.listSources(group, 
stream, type);
+                ClientUtils.initClientFactory();
+                StreamSourceClient sourceClient = 
ClientUtils.clientFactory.getSourceClient();
+                List<StreamSource> sources = sourceClient.listSources(group, 
stream, type);
                 sources.forEach(PrintUtils::printJson);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
diff --git 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
index a8702c104..a91e4925f 100644
--- 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
+++ 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
@@ -21,8 +21,10 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.github.pagehelper.PageInfo;
 import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
 import org.apache.inlong.manager.client.cli.pojo.GroupInfo;
 import org.apache.inlong.manager.client.cli.pojo.SinkInfo;
 import org.apache.inlong.manager.client.cli.pojo.SourceInfo;
@@ -35,7 +37,6 @@ import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 
-import java.io.IOException;
 import java.util.List;
 
 /**
@@ -49,42 +50,28 @@ public class ListCommand extends AbstractCommand {
 
     public ListCommand() {
         super("list");
-        InlongClientImpl inlongClient;
-        try {
-            inlongClient = ClientUtils.getClient();
-        } catch (IOException e) {
-            System.err.println("get inlong client error");
-            System.err.println(e.getMessage());
-            return;
-        }
-
-        InnerInlongManagerClient managerClient = new 
InnerInlongManagerClient(inlongClient.getConfiguration());
 
-        jcommander.addCommand("stream", new ListStream(managerClient));
-        jcommander.addCommand("group", new ListGroup(managerClient));
-        jcommander.addCommand("sink", new ListSink(managerClient));
-        jcommander.addCommand("source", new ListSource(managerClient));
+        jcommander.addCommand("stream", new ListStream());
+        jcommander.addCommand("group", new ListGroup());
+        jcommander.addCommand("sink", new ListSink());
+        jcommander.addCommand("source", new ListSource());
     }
 
     @Parameters(commandDescription = "Get stream summary information")
     private static class ListStream extends AbstractCommandRunner {
 
-        private final InnerInlongManagerClient managerClient;
-
         @Parameter()
         private List<String> params;
 
         @Parameter(names = {"-g", "--group"}, required = true, description = 
"inlong group id")
         private String groupId;
 
-        ListStream(InnerInlongManagerClient managerClient) {
-            this.managerClient = managerClient;
-        }
-
         @Override
         void run() {
             try {
-                List<InlongStreamInfo> streamInfos = 
managerClient.listStreamInfo(groupId);
+                ClientUtils.initClientFactory();
+                InlongStreamClient streamClient = 
ClientUtils.clientFactory.getStreamClient();
+                List<InlongStreamInfo> streamInfos = 
streamClient.listStreamInfo(groupId);
                 PrintUtils.print(streamInfos, StreamInfo.class);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
@@ -97,8 +84,6 @@ public class ListCommand extends AbstractCommand {
 
         private static final int DEFAULT_PAGE_SIZE = 10;
 
-        private final InnerInlongManagerClient managerClient;
-
         @Parameter()
         private List<String> params;
 
@@ -111,10 +96,6 @@ public class ListCommand extends AbstractCommand {
         @Parameter(names = {"-n", "--num"}, description = "the number 
displayed")
         private int pageSize;
 
-        ListGroup(InnerInlongManagerClient managerClient) {
-            this.managerClient = managerClient;
-        }
-
         @Override
         void run() {
             try {
@@ -129,7 +110,10 @@ public class ListCommand extends AbstractCommand {
                 List<Integer> statusList = 
SimpleGroupStatus.parseStatusCodeByStr(status);
                 pageRequest.setStatusList(statusList);
 
-                PageInfo<InlongGroupListResponse> groupPageInfo = 
managerClient.listGroups(pageRequest);
+                ClientUtils.initClientFactory();
+                InlongGroupClient groupClient = 
ClientUtils.clientFactory.getGroupClient();
+
+                PageInfo<InlongGroupListResponse> groupPageInfo = 
groupClient.listGroups(pageRequest);
                 PrintUtils.print(groupPageInfo.getList(), GroupInfo.class);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
@@ -140,8 +124,6 @@ public class ListCommand extends AbstractCommand {
     @Parameters(commandDescription = "Get sink summary information")
     private static class ListSink extends AbstractCommandRunner {
 
-        private final InnerInlongManagerClient managerClient;
-
         @Parameter()
         private List<String> params;
 
@@ -151,14 +133,12 @@ public class ListCommand extends AbstractCommand {
         @Parameter(names = {"-g", "--group"}, required = true, description = 
"group id")
         private String group;
 
-        ListSink(InnerInlongManagerClient managerClient) {
-            this.managerClient = managerClient;
-        }
-
         @Override
         void run() {
             try {
-                List<StreamSink> streamSinks = managerClient.listSinks(group, 
stream);
+                ClientUtils.initClientFactory();
+                StreamSinkClient sinkClient = 
ClientUtils.clientFactory.getSinkClient();
+                List<StreamSink> streamSinks = sinkClient.listSinks(group, 
stream);
                 PrintUtils.print(streamSinks, SinkInfo.class);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
@@ -169,8 +149,6 @@ public class ListCommand extends AbstractCommand {
     @Parameters(commandDescription = "Get source summary information")
     private static class ListSource extends AbstractCommandRunner {
 
-        private final InnerInlongManagerClient managerClient;
-
         @Parameter()
         private List<String> params;
 
@@ -183,14 +161,12 @@ public class ListCommand extends AbstractCommand {
         @Parameter(names = {"-t", "--type"}, description = "source type")
         private String type;
 
-        ListSource(InnerInlongManagerClient managerClient) {
-            this.managerClient = managerClient;
-        }
-
         @Override
         void run() {
             try {
-                List<StreamSource> streamSources = 
managerClient.listSources(group, stream, type);
+                ClientUtils.initClientFactory();
+                StreamSourceClient sourceClient = 
ClientUtils.clientFactory.getSourceClient();
+                List<StreamSource> streamSources = 
sourceClient.listSources(group, stream, type);
                 PrintUtils.print(streamSources, SourceInfo.class);
             } catch (Exception e) {
                 System.out.println(e.getMessage());
diff --git 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
index 289107777..5ade5e425 100644
--- 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
+++ 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
@@ -19,11 +19,11 @@ package org.apache.inlong.manager.client.cli.util;
 
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.common.auth.DefaultAuthentication;
 
 import java.io.BufferedInputStream;
 import java.io.File;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
@@ -39,49 +39,58 @@ public class ClientUtils {
 
     private static final String CONFIG_FILE = "application.properties";
 
-    /**
-     * Get an inlong client instance.
-     */
-    public static InlongClientImpl getClient() throws IOException {
-        Properties properties = new Properties();
-        String path = 
Thread.currentThread().getContextClassLoader().getResource("").getPath() + 
CONFIG_FILE;
-        InputStream inputStream = new 
BufferedInputStream(Files.newInputStream(Paths.get(path)));
-        properties.load(inputStream);
+    private static ClientConfiguration configuration;
 
-        String serviceUrl = properties.getProperty("server.host") + ":" + 
properties.getProperty("server.port");
-        String user = properties.getProperty("default.admin.user");
-        String password = properties.getProperty("default.admin.password");
+    private static String serviceUrl;
 
-        ClientConfiguration configuration = new ClientConfiguration();
-        configuration.setAuthentication(new DefaultAuthentication(user, 
password));
+    public static ClientFactory clientFactory;
 
+    /**
+     * Get an inlong client instance.
+     */
+    public static InlongClientImpl getClient() {
+        initClientConfiguration();
         return new InlongClientImpl(serviceUrl, configuration);
     }
 
+    public static void initClientFactory() {
+        clientFactory = 
org.apache.inlong.manager.client.api.util.ClientUtils.getClientFactory(
+                getClient().getConfiguration());
+    }
+
     /**
      * Get the file content.
      */
     public static String readFile(File file) {
         if (!file.exists()) {
             System.out.println("File does not exist.");
-        } else {
-            try {
-                FileReader fileReader = new FileReader(file);
-                Reader reader = new 
InputStreamReader(Files.newInputStream(file.toPath()));
-                int ch;
-                StringBuilder stringBuilder = new StringBuilder();
-                while ((ch = reader.read()) != -1) {
-                    stringBuilder.append((char) ch);
-                }
-                fileReader.close();
-                reader.close();
-
-                return stringBuilder.toString();
-            } catch (Exception e) {
-                System.out.println(e.getMessage());
+            return null;
+        }
+        StringBuilder stringBuilder = new StringBuilder();
+        try (Reader reader = new 
InputStreamReader(Files.newInputStream(file.toPath()))) {
+            int ch;
+            while ((ch = reader.read()) != -1) {
+                stringBuilder.append((char) ch);
             }
+        } catch (IOException e) {
+            System.out.println(e.getMessage());
         }
-        return null;
+        return stringBuilder.toString();
     }
 
+    private static void initClientConfiguration() {
+        Properties properties = new Properties();
+        String path = 
Thread.currentThread().getContextClassLoader().getResource("").getPath() + 
CONFIG_FILE;
+        try (InputStream inputStream = new 
BufferedInputStream(Files.newInputStream(Paths.get(path)))) {
+            properties.load(inputStream);
+            serviceUrl = properties.getProperty("server.host") + ":" + 
properties.getProperty("server.port");
+            String user = properties.getProperty("default.admin.user");
+            String password = properties.getProperty("default.admin.password");
+
+            configuration = new ClientConfiguration();
+            configuration.setAuthentication(new DefaultAuthentication(user, 
password));
+        } catch (IOException e) {
+            System.out.println(e.getMessage());
+        }
+    }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 2c3245bcb..11aaa3313 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -22,11 +22,17 @@ import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
 import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamTransformClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
@@ -51,11 +57,21 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
 
     private final InlongStreamImpl inlongStream;
     private final InnerStreamContext streamContext;
-    private final InnerInlongManagerClient managerClient;
+
+    private final InlongStreamClient streamClient;
+    private final StreamSourceClient sourceClient;
+    private final StreamSinkClient sinkClient;
+    private final StreamTransformClient transformClient;
 
     public DefaultInlongStreamBuilder(InlongStreamInfo streamInfo, 
InnerGroupContext groupContext,
-            InnerInlongManagerClient managerClient) {
-        this.managerClient = managerClient;
+            ClientConfiguration configuration) {
+
+        ClientFactory clientFactory = 
ClientUtils.getClientFactory(configuration);
+        this.streamClient = clientFactory.getStreamClient();
+        this.sourceClient = clientFactory.getSourceClient();
+        this.sinkClient = clientFactory.getSinkClient();
+        this.transformClient = clientFactory.getTransformClient();
+
         if (MapUtils.isEmpty(groupContext.getStreamContextMap())) {
             groupContext.setStreamContextMap(Maps.newHashMap());
         }
@@ -68,7 +84,7 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         groupContext.setStreamContext(streamContext);
         this.streamContext = streamContext;
 
-        this.inlongStream = new InlongStreamImpl(groupId, 
streamInfo.getInlongStreamId(), managerClient);
+        this.inlongStream = new InlongStreamImpl(groupId, 
streamInfo.getInlongStreamId(), configuration);
         if (CollectionUtils.isNotEmpty(streamInfo.getFieldList())) {
             this.inlongStream.setStreamFields(streamInfo.getFieldList());
         }
@@ -116,21 +132,21 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         InlongStreamInfo streamInfo = streamContext.getStreamInfo();
         StreamPipeline streamPipeline = inlongStream.createPipeline();
         streamInfo.setExtParams(JsonUtils.toJsonString(streamPipeline));
-        streamInfo.setId(managerClient.createStreamInfo(streamInfo));
+        streamInfo.setId(streamClient.createStreamInfo(streamInfo));
         // Create source and update index
         List<SourceRequest> sourceRequests = 
Lists.newArrayList(streamContext.getSourceRequests().values());
         for (SourceRequest sourceRequest : sourceRequests) {
-            sourceRequest.setId(managerClient.createSource(sourceRequest));
+            sourceRequest.setId(sourceClient.createSource(sourceRequest));
         }
         // Create sink and update index
         List<SinkRequest> sinkRequests = 
Lists.newArrayList(streamContext.getSinkRequests().values());
         for (SinkRequest sinkRequest : sinkRequests) {
-            sinkRequest.setId(managerClient.createSink(sinkRequest));
+            sinkRequest.setId(sinkClient.createSink(sinkRequest));
         }
         // Create transform and update index
         List<TransformRequest> transformRequests = 
Lists.newArrayList(streamContext.getTransformRequests().values());
         for (TransformRequest transformRequest : transformRequests) {
-            
transformRequest.setId(managerClient.createTransform(transformRequest));
+            
transformRequest.setId(transformClient.createTransform(transformRequest));
         }
         return inlongStream;
     }
@@ -140,9 +156,9 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
         StreamPipeline streamPipeline = inlongStream.createPipeline();
         dataStreamInfo.setExtParams(JsonUtils.toJsonString(streamPipeline));
-        Boolean isExist = managerClient.isStreamExists(dataStreamInfo);
+        Boolean isExist = streamClient.isStreamExists(dataStreamInfo);
         if (isExist) {
-            Pair<Boolean, String> updateMsg = 
managerClient.updateStreamInfo(dataStreamInfo);
+            Pair<Boolean, String> updateMsg = 
streamClient.updateStreamInfo(dataStreamInfo);
             if (!updateMsg.getKey()) {
                 throw new RuntimeException(String.format("Update data stream 
failed:%s", updateMsg.getValue()));
             }
@@ -160,7 +176,7 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         InlongStreamInfo streamInfo = streamContext.getStreamInfo();
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        List<TransformResponse> transformResponses = 
managerClient.listTransform(groupId, streamId);
+        List<TransformResponse> transformResponses = 
transformClient.listTransform(groupId, streamId);
         List<String> updateTransformNames = Lists.newArrayList();
         for (TransformResponse transformResponse : transformResponses) {
             StreamTransform transform = 
StreamTransformTransfer.parseStreamTransform(transformResponse);
@@ -169,14 +185,14 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
             if (transformRequests.get(transformName) == null) {
                 TransformRequest transformRequest = 
StreamTransformTransfer.createTransformRequest(transform,
                         streamInfo);
-                boolean isDelete = 
managerClient.deleteTransform(transformRequest);
+                boolean isDelete = 
transformClient.deleteTransform(transformRequest);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete 
transform=%s failed", transformRequest));
                 }
             } else {
                 TransformRequest transformRequest = 
transformRequests.get(transformName);
                 transformRequest.setId(id);
-                Pair<Boolean, String> updateState = 
managerClient.updateTransform(transformRequest);
+                Pair<Boolean, String> updateState = 
transformClient.updateTransform(transformRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update 
transform=%s failed with err=%s", transformRequest,
                             updateState.getValue()));
@@ -191,7 +207,7 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
                 continue;
             }
             TransformRequest transformRequest = requestEntry.getValue();
-            
transformRequest.setId(managerClient.createTransform(transformRequest));
+            
transformRequest.setId(transformClient.createTransform(transformRequest));
         }
     }
 
@@ -200,21 +216,21 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         InlongStreamInfo streamInfo = streamContext.getStreamInfo();
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        List<StreamSource> streamSources = managerClient.listSources(groupId, 
streamId);
+        List<StreamSource> streamSources = sourceClient.listSources(groupId, 
streamId);
         List<String> updateSourceNames = Lists.newArrayList();
         if (CollectionUtils.isNotEmpty(streamSources)) {
             for (StreamSource source : streamSources) {
                 final String sourceName = source.getSourceName();
                 final int id = source.getId();
                 if (sourceRequests.get(sourceName) == null) {
-                    boolean isDelete = managerClient.deleteSource(id);
+                    boolean isDelete = sourceClient.deleteSource(id);
                     if (!isDelete) {
                         throw new RuntimeException(String.format("Delete 
source failed by id=%s", id));
                     }
                 } else {
                     SourceRequest sourceRequest = 
sourceRequests.get(sourceName);
                     sourceRequest.setId(id);
-                    Pair<Boolean, String> updateState = 
managerClient.updateSource(sourceRequest);
+                    Pair<Boolean, String> updateState = 
sourceClient.updateSource(sourceRequest);
                     if (!updateState.getKey()) {
                         throw new RuntimeException(String.format("Update 
source=%s failed with err=%s", sourceRequest,
                                 updateState.getValue()));
@@ -230,7 +246,7 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
                 continue;
             }
             SourceRequest sourceRequest = requestEntry.getValue();
-            sourceRequest.setId(managerClient.createSource(sourceRequest));
+            sourceRequest.setId(sourceClient.createSource(sourceRequest));
         }
     }
 
@@ -239,20 +255,20 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
         InlongStreamInfo streamInfo = streamContext.getStreamInfo();
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        List<StreamSink> streamSinks = managerClient.listSinks(groupId, 
streamId);
+        List<StreamSink> streamSinks = sinkClient.listSinks(groupId, streamId);
         List<String> updateSinkNames = Lists.newArrayList();
         for (StreamSink sink : streamSinks) {
             final String sinkName = sink.getSinkName();
             final int id = sink.getId();
             if (sinkRequests.get(sinkName) == null) {
-                boolean isDelete = managerClient.deleteSink(id);
+                boolean isDelete = sinkClient.deleteSink(id);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete sink=%s 
failed", sink));
                 }
             } else {
                 SinkRequest sinkRequest = sinkRequests.get(sinkName);
                 sinkRequest.setId(id);
-                Pair<Boolean, String> updateState = 
managerClient.updateSink(sinkRequest);
+                Pair<Boolean, String> updateState = 
sinkClient.updateSink(sinkRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update sink=%s 
failed with err=%s", sinkRequest,
                             updateState.getValue()));
@@ -267,7 +283,7 @@ public class DefaultInlongStreamBuilder extends 
InlongStreamBuilder {
                 continue;
             }
             SinkRequest sinkRequest = requestEntry.getValue();
-            sinkRequest.setId(managerClient.createSink(sinkRequest));
+            sinkRequest.setId(sinkClient.createSink(sinkRequest));
         }
     }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
index 0a2c52518..a03997a9f 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongClientImpl.java
@@ -30,7 +30,8 @@ import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.enums.SimpleSourceStatus;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
@@ -51,6 +52,7 @@ public class InlongClientImpl implements InlongClient {
     private static final String HOST_SPLITTER = ":";
     @Getter
     private final ClientConfiguration configuration;
+    private final InlongGroupClient groupClient;
 
     public InlongClientImpl(String serviceUrl, ClientConfiguration 
configuration) {
         Map<String, String> hostPorts = 
Splitter.on(URL_SPLITTER).withKeyValueSeparator(HOST_SPLITTER)
@@ -74,37 +76,36 @@ public class InlongClientImpl implements InlongClient {
             throw new RuntimeException(String.format("%s is not connective", 
serviceUrl));
         }
         this.configuration = configuration;
+        groupClient = 
ClientUtils.getClientFactory(configuration).getGroupClient();
     }
 
     @Override
     public InlongGroup forGroup(InlongGroupInfo groupInfo) {
-        return new InlongGroupImpl(groupInfo, this);
+        return new InlongGroupImpl(groupInfo, configuration);
     }
 
     @Override
     public List<InlongGroup> listGroup(String expr, int status, int pageNum, 
int pageSize) {
-        InnerInlongManagerClient managerClient = new 
InnerInlongManagerClient(this.configuration);
-        PageInfo<InlongGroupListResponse> responsePageInfo = 
managerClient.listGroups(expr, status, pageNum,
+        PageInfo<InlongGroupListResponse> responsePageInfo = 
groupClient.listGroups(expr, status, pageNum,
                 pageSize);
         if (CollectionUtils.isEmpty(responsePageInfo.getList())) {
             return Lists.newArrayList();
         } else {
             return responsePageInfo.getList().stream().map(response -> {
                 String groupId = response.getInlongGroupId();
-                InlongGroupInfo groupInfo = 
managerClient.getGroupInfo(groupId);
-                return new InlongGroupImpl(groupInfo, this);
+                InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
+                return new InlongGroupImpl(groupInfo, configuration);
             }).collect(Collectors.toList());
         }
     }
 
     @Override
     public Map<String, SimpleGroupStatus> listGroupStatus(List<String> 
groupIds) {
-        InnerInlongManagerClient managerClient = new 
InnerInlongManagerClient(this.configuration);
         InlongGroupPageRequest request = new InlongGroupPageRequest();
         request.setGroupIdList(groupIds);
         request.setListSources(true);
 
-        PageInfo<InlongGroupListResponse> pageInfo = 
managerClient.listGroups(request);
+        PageInfo<InlongGroupListResponse> pageInfo = 
groupClient.listGroups(request);
         List<InlongGroupListResponse> groupListResponses = pageInfo.getList();
         Map<String, SimpleGroupStatus> groupStatusMap = Maps.newHashMap();
         if (CollectionUtils.isNotEmpty(groupListResponses)) {
@@ -121,12 +122,11 @@ public class InlongClientImpl implements InlongClient {
 
     @Override
     public InlongGroup getGroup(String groupId) {
-        InnerInlongManagerClient managerClient = new 
InnerInlongManagerClient(this.configuration);
-        InlongGroupInfo groupInfo = managerClient.getGroupInfo(groupId);
+        InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
         if (groupInfo == null) {
             return new BlankInlongGroup();
         }
-        return new InlongGroupImpl(groupInfo, this);
+        return new InlongGroupImpl(groupInfo, configuration);
     }
 
     private SimpleGroupStatus recheckGroupStatus(SimpleGroupStatus 
groupStatus, List<StreamSource> sources) {
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index bdeb65236..560b7dee8 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -22,13 +22,18 @@ import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongGroupContext;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
 import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
 import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.WorkflowClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
@@ -62,30 +67,36 @@ public class InlongGroupImpl implements InlongGroup {
 
     private final InnerGroupContext groupContext;
     private InlongGroupInfo groupInfo;
-    private InnerInlongManagerClient managerClient;
+    private final InlongGroupClient groupClient;
+    private final WorkflowClient workFlowClient;
+    private final InlongStreamClient streamClient;
+    private final ClientConfiguration configuration;
 
-    public InlongGroupImpl(InlongGroupInfo groupInfo, InlongClientImpl 
inlongClient) {
+    public InlongGroupImpl(InlongGroupInfo groupInfo, ClientConfiguration 
configuration) {
         this.groupInfo = groupInfo;
         this.groupContext = new InnerGroupContext();
         this.groupContext.setGroupInfo(groupInfo);
-        if (this.managerClient == null) {
-            this.managerClient = new 
InnerInlongManagerClient(inlongClient.getConfiguration());
-        }
+        this.configuration = configuration;
+
+        ClientFactory clientFactory = 
ClientUtils.getClientFactory(configuration);
+        this.streamClient = clientFactory.getStreamClient();
+        this.groupClient = clientFactory.getGroupClient();
+        this.workFlowClient = clientFactory.getWorkflowClient();
 
-        InlongGroupInfo newGroupInfo = 
managerClient.getGroupIfExists(groupInfo.getInlongGroupId());
+        InlongGroupInfo newGroupInfo = 
groupClient.getGroupIfExists(groupInfo.getInlongGroupId());
         if (newGroupInfo != null) {
             this.groupContext.setGroupInfo(groupInfo);
         } else {
             BaseSortConf sortConf = groupInfo.getSortConf();
             InlongGroupTransfer.createGroupInfo(groupInfo, sortConf);
-            String groupId = managerClient.createGroup(groupInfo.genRequest());
+            String groupId = groupClient.createGroup(groupInfo.genRequest());
             groupInfo.setInlongGroupId(groupId);
         }
     }
 
     @Override
     public InlongStreamBuilder createStream(InlongStreamInfo streamInfo) {
-        return new DefaultInlongStreamBuilder(streamInfo, this.groupContext, 
this.managerClient);
+        return new DefaultInlongStreamBuilder(streamInfo, this.groupContext, 
configuration);
     }
 
     @Override
@@ -96,7 +107,7 @@ public class InlongGroupImpl implements InlongGroup {
     @Override
     public InlongGroupContext init() throws Exception {
         InlongGroupInfo groupInfo = this.groupContext.getGroupInfo();
-        WorkflowResult initWorkflowResult = 
managerClient.initInlongGroup(groupInfo.genRequest());
+        WorkflowResult initWorkflowResult = 
groupClient.initInlongGroup(groupInfo.genRequest());
         List<TaskResponse> taskViews = initWorkflowResult.getNewTasks();
         Preconditions.checkNotEmpty(taskViews, "init inlong group info 
failed");
         TaskResponse taskView = taskViews.get(0);
@@ -122,7 +133,7 @@ public class InlongGroupImpl implements InlongGroup {
                 formDataNew, NewGroupProcessForm.class);
         Preconditions.checkNotNull(newGroupProcessForm, "NewGroupProcessForm 
cannot be null");
         groupContext.setInitMsg(newGroupProcessForm);
-        WorkflowResult startWorkflowResult = 
managerClient.startInlongGroup(taskId, newGroupProcessForm);
+        WorkflowResult startWorkflowResult = 
workFlowClient.startInlongGroup(taskId, newGroupProcessForm);
         processView = startWorkflowResult.getProcessInfo();
         Preconditions.checkTrue(ProcessStatus.COMPLETED == 
processView.getStatus(),
                 String.format("inlong group status %s is incorrect, should be 
COMPLETED", processView.getStatus()));
@@ -139,14 +150,14 @@ public class InlongGroupImpl implements InlongGroup {
         Preconditions.checkTrue(groupId != null && 
groupId.equals(this.groupInfo.getInlongGroupId()),
                 "groupId must be same");
 
-        InlongGroupInfo existGroupInfo = managerClient.getGroupInfo(groupId);
+        InlongGroupInfo existGroupInfo = groupClient.getGroupInfo(groupId);
         SimpleGroupStatus status = 
SimpleGroupStatus.parseStatusByCode(existGroupInfo.getStatus());
         Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
                 "inlong group is in init status, should not be updated");
 
         InlongGroupInfo groupInfo = 
InlongGroupTransfer.createGroupInfo(originGroupInfo, sortConf);
         InlongGroupRequest groupRequest = groupInfo.genRequest();
-        Pair<String, String> idAndErr = 
managerClient.updateGroup(groupRequest);
+        Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
         String errMsg = idAndErr.getValue();
         Preconditions.checkNull(errMsg, errMsg);
 
@@ -159,7 +170,7 @@ public class InlongGroupImpl implements InlongGroup {
         Preconditions.checkNotNull(sortConf, "sort conf cannot be null");
 
         final String groupId = this.groupInfo.getInlongGroupId();
-        InlongGroupInfo groupInfo = managerClient.getGroupInfo(groupId);
+        InlongGroupInfo groupInfo = groupClient.getGroupInfo(groupId);
 
         SimpleGroupStatus status = 
SimpleGroupStatus.parseStatusByCode(groupInfo.getStatus());
         Preconditions.checkTrue(status != SimpleGroupStatus.INITIALIZING,
@@ -167,7 +178,7 @@ public class InlongGroupImpl implements InlongGroup {
 
         groupInfo = InlongGroupTransfer.createGroupInfo(this.groupInfo, 
sortConf);
         InlongGroupRequest groupRequest = groupInfo.genRequest();
-        Pair<String, String> idAndErr = 
managerClient.updateGroup(groupRequest);
+        Pair<String, String> idAndErr = groupClient.updateGroup(groupRequest);
         String errMsg = idAndErr.getValue();
         Preconditions.checkNull(errMsg, errMsg);
         this.groupContext.setGroupInfo(groupInfo);
@@ -177,7 +188,7 @@ public class InlongGroupImpl implements InlongGroup {
     public InlongGroupContext reInitOnUpdate(InlongGroupInfo originGroupInfo, 
BaseSortConf sortConf) throws Exception {
         this.update(originGroupInfo, sortConf);
         String inlongGroupId = 
this.groupContext.getGroupInfo().getInlongGroupId();
-        InlongGroupInfo newGroupInfo = 
managerClient.getGroupIfExists(inlongGroupId);
+        InlongGroupInfo newGroupInfo = 
groupClient.getGroupIfExists(inlongGroupId);
         if (newGroupInfo != null) {
             this.groupContext.setGroupInfo(newGroupInfo);
         } else {
@@ -195,27 +206,27 @@ public class InlongGroupImpl implements InlongGroup {
     @Override
     public InlongGroupContext suspend(boolean async) {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        Pair<String, String> idAndErr = 
managerClient.updateGroup(groupInfo.genRequest());
+        Pair<String, String> idAndErr = 
groupClient.updateGroup(groupInfo.genRequest());
         final String errMsg = idAndErr.getValue();
         final String groupId = idAndErr.getKey();
         Preconditions.checkNull(errMsg, errMsg);
-        managerClient.operateInlongGroup(groupId, SimpleGroupStatus.STOPPED, 
async);
+        groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STOPPED, 
async);
         return generateSnapshot();
     }
 
     @Override
-    public InlongGroupContext restart() throws Exception {
+    public InlongGroupContext restart() {
         return restart(false);
     }
 
     @Override
-    public InlongGroupContext restart(boolean async) throws Exception {
+    public InlongGroupContext restart(boolean async) {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
-        Pair<String, String> idAndErr = 
managerClient.updateGroup(groupInfo.genRequest());
+        Pair<String, String> idAndErr = 
groupClient.updateGroup(groupInfo.genRequest());
         final String errMsg = idAndErr.getValue();
         final String groupId = idAndErr.getKey();
         Preconditions.checkNull(errMsg, errMsg);
-        managerClient.operateInlongGroup(groupId, SimpleGroupStatus.STARTED, 
async);
+        groupClient.operateInlongGroup(groupId, SimpleGroupStatus.STARTED, 
async);
         return generateSnapshot();
     }
 
@@ -226,8 +237,8 @@ public class InlongGroupImpl implements InlongGroup {
 
     @Override
     public InlongGroupContext delete(boolean async) throws Exception {
-        InlongGroupInfo groupInfo = 
managerClient.getGroupInfo(groupContext.getGroupId());
-        boolean isDeleted = 
managerClient.deleteInlongGroup(groupInfo.getInlongGroupId(), async);
+        InlongGroupInfo groupInfo = 
groupClient.getGroupInfo(groupContext.getGroupId());
+        boolean isDeleted = 
groupClient.deleteInlongGroup(groupInfo.getInlongGroupId(), async);
         if (isDeleted) {
             groupInfo.setStatus(GroupStatus.DELETED.getCode());
         }
@@ -235,23 +246,23 @@ public class InlongGroupImpl implements InlongGroup {
     }
 
     @Override
-    public List<InlongStream> listStreams() throws Exception {
+    public List<InlongStream> listStreams() {
         String inlongGroupId = this.groupContext.getGroupId();
         return fetchInlongStreams(inlongGroupId);
     }
 
     @Override
-    public InlongGroupContext reset(int rerun, int resetFinalStatus) throws 
Exception {
+    public InlongGroupContext reset(int rerun, int resetFinalStatus) {
         InlongGroupInfo groupInfo = groupContext.getGroupInfo();
         InlongGroupResetRequest request = new 
InlongGroupResetRequest(groupInfo.getInlongGroupId(),
                 rerun, resetFinalStatus);
-        managerClient.resetGroup(request);
+        groupClient.resetGroup(request);
         return generateSnapshot();
     }
 
     private InlongGroupContext generateSnapshot() {
         // fetch current group
-        InlongGroupInfo groupInfo = 
managerClient.getGroupInfo(groupContext.getGroupId());
+        InlongGroupInfo groupInfo = 
groupClient.getGroupInfo(groupContext.getGroupId());
         groupContext.setGroupInfo(groupInfo);
         String inlongGroupId = groupInfo.getInlongGroupId();
         // fetch stream in group
@@ -263,7 +274,7 @@ public class InlongGroupImpl implements InlongGroup {
         // create group context
         InlongGroupContext inlongGroupContext = new 
InlongGroupContext(groupContext);
         // fetch group logs
-        List<EventLogView> logViews = 
managerClient.getInlongGroupError(inlongGroupId);
+        List<EventLogView> logViews = 
workFlowClient.getInlongGroupError(inlongGroupId);
         if (CollectionUtils.isNotEmpty(logViews)) {
             Map<String, List<String>> errMsgMap = Maps.newHashMap();
             Map<String, List<String>> groupLogMap = Maps.newHashMap();
@@ -285,7 +296,7 @@ public class InlongGroupImpl implements InlongGroup {
         // fetch stream logs
         Map<String, InlongStream> streams = 
inlongGroupContext.getInlongStreamMap();
         streams.keySet().forEach(streamId -> {
-            List<InlongStreamConfigLogListResponse> logList = 
managerClient.getStreamLogs(inlongGroupId, streamId);
+            List<InlongStreamConfigLogListResponse> logList = 
streamClient.getStreamLogs(inlongGroupId, streamId);
             if (CollectionUtils.isNotEmpty(logList)) {
                 Map<String, List<String>> streamLogs = Maps.newHashMap();
                 logList.stream().filter(x -> 
StringUtils.isNotEmpty(x.getComponentName()))
@@ -301,12 +312,12 @@ public class InlongGroupImpl implements InlongGroup {
     }
 
     private List<InlongStream> fetchInlongStreams(String groupId) {
-        List<InlongStreamInfo> streamInfos = 
managerClient.listStreamInfo(groupId);
+        List<InlongStreamInfo> streamInfos = 
streamClient.listStreamInfo(groupId);
         if (CollectionUtils.isEmpty(streamInfos)) {
             return null;
         }
         return streamInfos.stream()
-                .map(streamInfo -> new InlongStreamImpl(streamInfo, 
managerClient))
+                .map(streamInfo -> new InlongStreamImpl(streamInfo, 
configuration))
                 .collect(Collectors.toList());
     }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index d6ff3b047..f1ff69066 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -24,8 +24,14 @@ import lombok.Data;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.InlongStream;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamTransformClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.client.api.util.StreamTransformTransfer;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
@@ -41,6 +47,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -50,7 +57,13 @@ import java.util.stream.Collectors;
 @Data
 public class InlongStreamImpl implements InlongStream {
 
-    private InnerInlongManagerClient managerClient;
+    private InlongStreamClient streamClient;
+
+    private StreamSourceClient sourceClient;
+
+    private StreamSinkClient sinkClient;
+
+    private StreamTransformClient transformClient;
 
     private String inlongGroupId;
 
@@ -67,10 +80,16 @@ public class InlongStreamImpl implements InlongStream {
     /**
      * Constructor of InlongStreamImpl.
      */
-    public InlongStreamImpl(InlongStreamInfo streamInfo, 
InnerInlongManagerClient managerClient) {
-        this.managerClient = managerClient;
+    public InlongStreamImpl(InlongStreamInfo streamInfo, ClientConfiguration 
configuration) {
         this.inlongGroupId = streamInfo.getInlongGroupId();
         this.inlongStreamId = streamInfo.getInlongStreamId();
+
+        ClientFactory clientFactory = 
ClientUtils.getClientFactory(configuration);
+        this.streamClient = clientFactory.getStreamClient();
+        this.sourceClient = clientFactory.getSourceClient();
+        this.sinkClient = clientFactory.getSinkClient();
+        this.transformClient = clientFactory.getTransformClient();
+
         List<StreamField> streamFields = streamInfo.getFieldList();
         if (CollectionUtils.isNotEmpty(streamFields)) {
             this.streamFields = streamFields.stream()
@@ -108,8 +127,14 @@ public class InlongStreamImpl implements InlongStream {
         }
     }
 
-    public InlongStreamImpl(String groupId, String streamId, 
InnerInlongManagerClient managerClient) {
-        this.managerClient = managerClient;
+    public InlongStreamImpl(String groupId, String streamId, 
ClientConfiguration configuration) {
+        if (Optional.ofNullable(configuration).isPresent()) {
+            ClientFactory clientFactory = 
ClientUtils.getClientFactory(configuration);
+            this.streamClient = clientFactory.getStreamClient();
+            this.sourceClient = clientFactory.getSourceClient();
+            this.sinkClient = clientFactory.getSinkClient();
+            this.transformClient = clientFactory.getTransformClient();
+        }
         this.inlongGroupId = groupId;
         this.inlongStreamId = streamId;
     }
@@ -280,7 +305,7 @@ public class InlongStreamImpl implements InlongStream {
 
     @Override
     public InlongStream update() {
-        InlongStreamInfo streamInfo = 
managerClient.getStreamInfo(inlongGroupId, inlongStreamId);
+        InlongStreamInfo streamInfo = 
streamClient.getStreamInfo(inlongGroupId, inlongStreamId);
         if (streamInfo == null) {
             throw new IllegalArgumentException(
                     String.format("Stream not exists for group=%s and 
stream=%s", inlongGroupId, inlongStreamId));
@@ -289,7 +314,7 @@ public class InlongStreamImpl implements InlongStream {
         streamInfo.setFieldList(this.streamFields);
         StreamPipeline streamPipeline = createPipeline();
         streamInfo.setExtParams(JsonUtils.toJsonString(streamPipeline));
-        Pair<Boolean, String> updateMsg = 
managerClient.updateStreamInfo(streamInfo);
+        Pair<Boolean, String> updateMsg = 
streamClient.updateStreamInfo(streamInfo);
         if (!updateMsg.getKey()) {
             throw new RuntimeException(String.format("Update data stream 
failed: %s", updateMsg.getValue()));
         }
@@ -300,7 +325,7 @@ public class InlongStreamImpl implements InlongStream {
     }
 
     private void initOrUpdateTransform(InlongStreamInfo streamInfo) {
-        List<TransformResponse> transformResponses = 
managerClient.listTransform(inlongGroupId, inlongStreamId);
+        List<TransformResponse> transformResponses = 
transformClient.listTransform(inlongGroupId, inlongStreamId);
         List<String> updateTransformNames = Lists.newArrayList();
         for (TransformResponse transformResponse : transformResponses) {
             StreamTransform transform = 
StreamTransformTransfer.parseStreamTransform(transformResponse);
@@ -309,7 +334,7 @@ public class InlongStreamImpl implements InlongStream {
             if (this.streamTransforms.get(transformName) == null) {
                 TransformRequest transformRequest = 
StreamTransformTransfer.createTransformRequest(transform,
                         streamInfo);
-                boolean isDelete = 
managerClient.deleteTransform(transformRequest);
+                boolean isDelete = 
transformClient.deleteTransform(transformRequest);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete 
transform=%s failed", transformRequest));
                 }
@@ -318,7 +343,7 @@ public class InlongStreamImpl implements InlongStream {
                 TransformRequest transformRequest = 
StreamTransformTransfer.createTransformRequest(newTransform,
                         streamInfo);
                 transformRequest.setId(id);
-                Pair<Boolean, String> updateState = 
managerClient.updateTransform(transformRequest);
+                Pair<Boolean, String> updateState = 
transformClient.updateTransform(transformRequest);
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update 
transform=%s failed with err=%s", transformRequest,
                             updateState.getValue()));
@@ -334,18 +359,18 @@ public class InlongStreamImpl implements InlongStream {
             StreamTransform transform = transformEntry.getValue();
             TransformRequest transformRequest = 
StreamTransformTransfer.createTransformRequest(transform,
                     streamInfo);
-            managerClient.createTransform(transformRequest);
+            transformClient.createTransform(transformRequest);
         }
     }
 
     private void initOrUpdateSource(InlongStreamInfo streamInfo) {
-        List<StreamSource> streamSources = 
managerClient.listSources(inlongGroupId, inlongStreamId);
+        List<StreamSource> streamSources = 
sourceClient.listSources(inlongGroupId, inlongStreamId);
         List<String> updateSourceNames = Lists.newArrayList();
         for (StreamSource source : streamSources) {
             final String sourceName = source.getSourceName();
             final int id = source.getId();
             if (this.streamSources.get(sourceName) == null) {
-                boolean isDelete = managerClient.deleteSource(id);
+                boolean isDelete = sourceClient.deleteSource(id);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete source=%s 
failed", source));
                 }
@@ -354,7 +379,7 @@ public class InlongStreamImpl implements InlongStream {
                 streamSource.setId(id);
                 streamSource.setInlongGroupId(streamInfo.getInlongGroupId());
                 streamSource.setInlongStreamId(streamInfo.getInlongStreamId());
-                Pair<Boolean, String> updateState = 
managerClient.updateSource(streamSource.genSourceRequest());
+                Pair<Boolean, String> updateState = 
sourceClient.updateSource(streamSource.genSourceRequest());
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update source=%s 
failed with err=%s", streamSource,
                             updateState.getValue()));
@@ -370,19 +395,19 @@ public class InlongStreamImpl implements InlongStream {
             StreamSource streamSource = sourceEntry.getValue();
             streamSource.setInlongGroupId(streamInfo.getInlongGroupId());
             streamSource.setInlongStreamId(streamInfo.getInlongStreamId());
-            managerClient.createSource(streamSource.genSourceRequest());
+            sourceClient.createSource(streamSource.genSourceRequest());
         }
     }
 
     private void initOrUpdateSink(InlongStreamInfo streamInfo) {
-        List<StreamSink> streamSinks = managerClient.listSinks(inlongGroupId, 
inlongStreamId);
+        List<StreamSink> streamSinks = sinkClient.listSinks(inlongGroupId, 
inlongStreamId);
         // delete or update the sink info
         List<String> updateSinkNames = Lists.newArrayList();
         for (StreamSink sink : streamSinks) {
             final String sinkName = sink.getSinkName();
             final int id = sink.getId();
             if (this.streamSinks.get(sinkName) == null) {
-                boolean isDelete = managerClient.deleteSink(id);
+                boolean isDelete = sinkClient.deleteSink(id);
                 if (!isDelete) {
                     throw new RuntimeException(String.format("Delete sink=%s 
failed", sink));
                 }
@@ -391,7 +416,7 @@ public class InlongStreamImpl implements InlongStream {
                 streamSink.setId(id);
                 streamSink.setInlongGroupId(streamInfo.getInlongGroupId());
                 streamSink.setInlongStreamId(streamInfo.getInlongStreamId());
-                Pair<Boolean, String> updateState = 
managerClient.updateSink(streamSink.genSinkRequest());
+                Pair<Boolean, String> updateState = 
sinkClient.updateSink(streamSink.genSinkRequest());
                 if (!updateState.getKey()) {
                     throw new RuntimeException(String.format("Update sink=%s 
failed with err=%s", streamSink,
                             updateState.getValue()));
@@ -409,7 +434,7 @@ public class InlongStreamImpl implements InlongStream {
             StreamSink streamSink = sinkEntry.getValue();
             streamSink.setInlongGroupId(streamInfo.getInlongGroupId());
             streamSink.setInlongStreamId(streamInfo.getInlongStreamId());
-            managerClient.createSink(streamSink.genSinkRequest());
+            sinkClient.createSink(streamSink.genSinkRequest());
         }
     }
 
@@ -422,7 +447,7 @@ public class InlongStreamImpl implements InlongStream {
                 .filter(streamSink -> streamSink.getId().equals(sinkId))
                 .findAny()
                 // Try to get from db, if it doesn't exist in cache
-                .orElseGet(() -> managerClient.getSinkInfo(sinkId));
+                .orElseGet(() -> sinkClient.getSinkInfo(sinkId));
     }
 
     @Override
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/LowLevelInlongClientImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/LowLevelInlongClientImpl.java
index 0873702be..dc9f05314 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/LowLevelInlongClientImpl.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/LowLevelInlongClientImpl.java
@@ -24,7 +24,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.MapUtils;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.LowLevelInlongClient;
-import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
@@ -71,14 +73,14 @@ public class LowLevelInlongClientImpl implements 
LowLevelInlongClient {
     }
 
     @Override
-    public Integer saveCluster(ClusterRequest request) throws Exception {
-        InnerInlongManagerClient managerClient = new 
InnerInlongManagerClient(this.configuration);
-        return managerClient.saveCluster(request);
+    public Integer saveCluster(ClusterRequest request) {
+        InlongClusterClient clusterClient = 
ClientUtils.getClientFactory(configuration).getClusterClient();
+        return clusterClient.saveCluster(request);
     }
 
     @Override
-    public PageInfo<InlongGroupListResponse> listGroup(InlongGroupPageRequest 
request) throws Exception {
-        InnerInlongManagerClient managerClient = new 
InnerInlongManagerClient(this.configuration);
-        return managerClient.listGroups(request);
+    public PageInfo<InlongGroupListResponse> listGroup(InlongGroupPageRequest 
request) {
+        InlongGroupClient groupClient = 
ClientUtils.getClientFactory(configuration).getGroupClient();
+        return groupClient.listGroups(request);
     }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
deleted file mode 100644
index c9bd0787a..000000000
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.client.api.inner;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.github.pagehelper.PageInfo;
-import com.google.common.collect.Lists;
-import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
-import org.apache.inlong.manager.client.api.service.AuthInterceptor;
-import org.apache.inlong.manager.client.api.service.InlongClusterApi;
-import org.apache.inlong.manager.client.api.service.InlongGroupApi;
-import org.apache.inlong.manager.client.api.service.InlongStreamApi;
-import org.apache.inlong.manager.client.api.service.StreamSinkApi;
-import org.apache.inlong.manager.client.api.service.StreamSourceApi;
-import org.apache.inlong.manager.client.api.service.StreamTransformApi;
-import org.apache.inlong.manager.client.api.service.WorkflowApi;
-import org.apache.inlong.manager.common.auth.Authentication;
-import org.apache.inlong.manager.common.auth.DefaultAuthentication;
-import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
-import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.common.pojo.sink.StreamSink;
-import org.apache.inlong.manager.common.pojo.source.SourceRequest;
-import org.apache.inlong.manager.common.pojo.source.StreamSource;
-import 
org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
-import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
-import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
-import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
-import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import 
org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
-import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.springframework.boot.configurationprocessor.json.JSONObject;
-import retrofit2.Call;
-import retrofit2.Retrofit;
-import retrofit2.converter.jackson.JacksonConverterFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
-import static 
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
-
-/**
- * InnerInlongManagerClient is used to invoke http api of inlong manager.
- */
-@Slf4j
-public class InnerInlongManagerClient {
-
-    private static final String REQUEST_FAILED_MSG = "Request to Inlong %s 
failed: %s";
-
-    protected final String host;
-    protected final int port;
-
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
-    private final InlongClusterApi inlongClusterApi;
-    private final InlongStreamApi inlongStreamApi;
-    private final InlongGroupApi inlongGroupApi;
-    private final StreamSourceApi streamSourceApi;
-    private final StreamTransformApi streamTransformApi;
-    private final StreamSinkApi streamSinkApi;
-    private final WorkflowApi workflowApi;
-
-    public InnerInlongManagerClient(ClientConfiguration configuration) {
-        this.host = configuration.getBindHost();
-        this.port = configuration.getBindPort();
-
-        Authentication authentication = configuration.getAuthentication();
-        Preconditions.checkNotNull(authentication, "inlong should be 
authenticated");
-        Preconditions.checkTrue(authentication instanceof 
DefaultAuthentication,
-                "inlong only support default authentication");
-        DefaultAuthentication defaultAuthentication = (DefaultAuthentication) 
authentication;
-
-        OkHttpClient okHttpClient = new OkHttpClient.Builder()
-                .addInterceptor(
-                        new 
AuthInterceptor(defaultAuthentication.getUsername(), 
defaultAuthentication.getPassword()))
-                .connectTimeout(configuration.getConnectTimeout(), 
configuration.getTimeUnit())
-                .readTimeout(configuration.getReadTimeout(), 
configuration.getTimeUnit())
-                .writeTimeout(configuration.getWriteTimeout(), 
configuration.getTimeUnit())
-                .retryOnConnectionFailure(true)
-                .build();
-
-        Retrofit retrofit = new Retrofit.Builder()
-                .baseUrl("http://"; + host + ":" + port + 
"/api/inlong/manager/")
-                
.addConverterFactory(JacksonConverterFactory.create(JsonUtils.OBJECT_MAPPER))
-                .client(okHttpClient)
-                .build();
-
-        inlongStreamApi = retrofit.create(InlongStreamApi.class);
-        inlongGroupApi = retrofit.create(InlongGroupApi.class);
-        streamSinkApi = retrofit.create(StreamSinkApi.class);
-        streamSourceApi = retrofit.create(StreamSourceApi.class);
-        streamTransformApi = retrofit.create(StreamTransformApi.class);
-        workflowApi = retrofit.create(WorkflowApi.class);
-        inlongClusterApi = retrofit.create(InlongClusterApi.class);
-    }
-
-    /**
-     * Save component cluster for Inlong
-     *
-     * @param request cluster create request
-     * @return clusterIndex
-     */
-    public Integer saveCluster(ClusterRequest request) {
-        Preconditions.checkNotEmpty(request.getName(), "cluster name should 
not be empty");
-        Preconditions.checkNotEmpty(request.getType(), "cluster type should 
not be empty");
-        Preconditions.checkNotEmpty(request.getClusterTags(), "cluster tags 
should not be empty");
-        Response<Integer> clusterIndexResponse = 
executeHttpCall(inlongClusterApi.save(request));
-        assertRespSuccess(clusterIndexResponse);
-        return clusterIndexResponse.getData();
-    }
-
-    /**
-     * Get inlong group by the given inlong group id.
-     *
-     * @param inlongGroupId the given inlong group id
-     * @return inlong group info if exists, null will be returned if not exits
-     */
-    public InlongGroupInfo getGroupIfExists(String inlongGroupId) {
-        if (this.isGroupExists(inlongGroupId)) {
-            return getGroupInfo(inlongGroupId);
-        }
-        return null;
-    }
-
-    /**
-     * Check whether a group exists based on the group ID.
-     */
-    public Boolean isGroupExists(String inlongGroupId) {
-        Preconditions.checkNotEmpty(inlongGroupId, "InlongGroupId should not 
be empty");
-
-        Response<Boolean> response = 
executeHttpCall(inlongGroupApi.isGroupExists(inlongGroupId));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * Get info of group.
-     */
-    @SneakyThrows
-    public InlongGroupInfo getGroupInfo(String inlongGroupId) {
-        Preconditions.checkNotEmpty(inlongGroupId, "InlongGroupId should not 
be empty");
-
-        Response<Object> responseBody = 
executeHttpCall(inlongGroupApi.getGroupInfo(inlongGroupId));
-        if (responseBody.isSuccess()) {
-            JSONObject groupInfoJson = JsonUtils.parseObject(
-                    
JsonUtils.toJsonString(JsonUtils.toJsonString(responseBody.getData())),
-                    JSONObject.class);
-            if (groupInfoJson.has(MQ_FIELD_OLD) && 
!groupInfoJson.has(MQ_FIELD)) {
-                groupInfoJson.put(MQ_FIELD, groupInfoJson.get(MQ_FIELD_OLD));
-            }
-            return JsonUtils.parseObject(groupInfoJson.toString(), 
InlongGroupInfo.class);
-        }
-
-        if (responseBody.getErrMsg().contains("not exist")) {
-            return null;
-        } else {
-            throw new RuntimeException(responseBody.getErrMsg());
-        }
-    }
-
-    /**
-     * Get inlong group list.
-     */
-    public PageInfo<InlongGroupListResponse> listGroups(String keyword, int 
status, int pageNum, int pageSize) {
-        InlongGroupPageRequest request = InlongGroupPageRequest.builder()
-                .keyword(keyword)
-                .status(status)
-                .build();
-        request.setPageNum(pageNum <= 0 ? 1 : pageNum);
-        request.setPageSize(pageSize);
-
-        Response<PageInfo<InlongGroupListResponse>> pageInfoResponse = 
executeHttpCall(
-                inlongGroupApi.listGroups(request));
-
-        if (pageInfoResponse.isSuccess()) {
-            return pageInfoResponse.getData();
-        }
-        if (pageInfoResponse.getErrMsg().contains("not exist")) {
-            return null;
-        } else {
-            throw new RuntimeException(pageInfoResponse.getErrMsg());
-        }
-    }
-
-    /**
-     * List inlong group by the page request
-     *
-     * @param pageRequest The pageRequest
-     * @return Response encapsulate of inlong group list
-     */
-    public PageInfo<InlongGroupListResponse> listGroups(InlongGroupPageRequest 
pageRequest) {
-        Response<PageInfo<InlongGroupListResponse>> pageInfoResponse = 
executeHttpCall(
-                inlongGroupApi.listGroups(pageRequest));
-        assertRespSuccess(pageInfoResponse);
-        return pageInfoResponse.getData();
-    }
-
-    /**
-     * Create an inlong group
-     */
-    public String createGroup(InlongGroupRequest groupInfo) {
-        Response<String> response = 
executeHttpCall(inlongGroupApi.createGroup(groupInfo));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * Update inlong group info
-     *
-     * @return groupId && errMsg
-     */
-    public Pair<String, String> updateGroup(InlongGroupRequest groupRequest) {
-        Response<String> response = 
executeHttpCall(inlongGroupApi.updateGroup(groupRequest));
-        return Pair.of(response.getData(), response.getErrMsg());
-    }
-
-    /**
-     * Reset inlong group info
-     */
-    public boolean resetGroup(InlongGroupResetRequest resetRequest) {
-        Response<Boolean> response = 
executeHttpCall(inlongGroupApi.resetGroup(resetRequest));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * Create an inlong stream.
-     */
-    public Integer createStreamInfo(InlongStreamInfo streamInfo) {
-        Response<Integer> response = 
executeHttpCall(inlongStreamApi.createStream(streamInfo));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    public Boolean isStreamExists(InlongStreamInfo streamInfo) {
-        final String groupId = streamInfo.getInlongGroupId();
-        final String streamId = streamInfo.getInlongStreamId();
-        Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be 
empty");
-        Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be 
empty");
-
-        Response<Boolean> response = 
executeHttpCall(inlongStreamApi.isStreamExists(groupId, streamId));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo) 
{
-        Response<Boolean> resp = 
executeHttpCall(inlongStreamApi.updateStream(streamInfo));
-
-        if (resp.getData() != null) {
-            return Pair.of(resp.getData(), resp.getErrMsg());
-        } else {
-            return Pair.of(false, resp.getErrMsg());
-        }
-    }
-
-    /**
-     * Get inlong stream by the given groupId and streamId.
-     */
-    public InlongStreamInfo getStreamInfo(String groupId, String streamId) {
-        Response<InlongStreamInfo> response = 
executeHttpCall(inlongStreamApi.getStream(groupId, streamId));
-
-        if (response.isSuccess()) {
-            return response.getData();
-        }
-        if (response.getErrMsg().contains("not exist")) {
-            return null;
-        } else {
-            throw new RuntimeException(response.getErrMsg());
-        }
-    }
-
-    /**
-     * Get inlong stream info.
-     */
-    public List<InlongStreamInfo> listStreamInfo(String inlongGroupId) {
-        InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
-        pageRequest.setInlongGroupId(inlongGroupId);
-
-        Response<PageInfo<InlongStreamInfo>> response = 
executeHttpCall(inlongStreamApi.listStream(pageRequest));
-        assertRespSuccess(response);
-        return response.getData().getList();
-    }
-
-    /**
-     * Create an inlong stream source.
-     */
-    public Integer createSource(SourceRequest request) {
-        Response<Integer> response = 
executeHttpCall(streamSourceApi.createSource(request));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * List stream sources by the given groupId and streamId.
-     */
-    public List<StreamSource> listSources(String groupId, String streamId) {
-        return listSources(groupId, streamId, null);
-    }
-
-    /**
-     * List stream sources by the specified source type.
-     */
-    public List<StreamSource> listSources(String groupId, String streamId, 
String sourceType) {
-        Response<PageInfo<StreamSource>> response = executeHttpCall(
-                streamSourceApi.listSources(groupId, streamId, sourceType));
-        assertRespSuccess(response);
-        return response.getData().getList();
-    }
-
-    /**
-     * Update the stream source info.
-     */
-    public Pair<Boolean, String> updateSource(SourceRequest request) {
-        Response<Boolean> response = 
executeHttpCall(streamSourceApi.updateSource(request));
-        if (response.getData() != null) {
-            return Pair.of(response.getData(), response.getErrMsg());
-        } else {
-            return Pair.of(false, response.getErrMsg());
-        }
-    }
-
-    /**
-     * Delete the stream source info by id.
-     */
-    public boolean deleteSource(int id) {
-        Preconditions.checkTrue(id > 0, "sourceId is illegal");
-        Response<Boolean> response = 
executeHttpCall(streamSourceApi.deleteSource(id));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * Create a conversion function info.
-     */
-    public Integer createTransform(TransformRequest transformRequest) {
-        Response<Integer> response = 
executeHttpCall(streamTransformApi.createTransform(transformRequest));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * Get all conversion function info.
-     */
-    public List<TransformResponse> listTransform(String groupId, String 
streamId) {
-        Response<List<TransformResponse>> response = executeHttpCall(
-                streamTransformApi.listTransform(groupId, streamId));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * Update conversion function info.
-     */
-    public Pair<Boolean, String> updateTransform(TransformRequest 
transformRequest) {
-        Response<Boolean> response = 
executeHttpCall(streamTransformApi.updateTransform(transformRequest));
-
-        if (response.getData() != null) {
-            return Pair.of(response.getData(), response.getErrMsg());
-        } else {
-            return Pair.of(false, response.getErrMsg());
-        }
-    }
-
-    /**
-     * Delete conversion function info.
-     */
-    public boolean deleteTransform(TransformRequest transformRequest) {
-        Preconditions.checkNotEmpty(transformRequest.getInlongGroupId(), 
"inlongGroupId should not be null");
-        Preconditions.checkNotEmpty(transformRequest.getInlongStreamId(), 
"inlongStreamId should not be null");
-        Preconditions.checkNotEmpty(transformRequest.getTransformName(), 
"transformName should not be null");
-
-        Response<Boolean> response = executeHttpCall(
-                
streamTransformApi.deleteTransform(transformRequest.getInlongGroupId(),
-                        transformRequest.getInlongStreamId(), 
transformRequest.getTransformName()));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    public Integer createSink(SinkRequest sinkRequest) {
-        Response<Integer> response = 
executeHttpCall(streamSinkApi.createSink(sinkRequest));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * Delete stream sink info by ID.
-     */
-    public boolean deleteSink(int id) {
-        Preconditions.checkTrue(id > 0, "sinkId is illegal");
-        Response<Boolean> response = 
executeHttpCall(streamSinkApi.deleteSink(id));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * List stream sinks by the given groupId and streamId.
-     */
-    public List<StreamSink> listSinks(String groupId, String streamId) {
-        return listSinks(groupId, streamId, null);
-    }
-
-    /**
-     * List stream sinks by the specified sink type.
-     */
-    public List<StreamSink> listSinks(String groupId, String streamId, String 
sinkType) {
-        Response<PageInfo<StreamSink>> response = executeHttpCall(
-                streamSinkApi.listSinks(groupId, streamId, sinkType));
-        assertRespSuccess(response);
-        return response.getData().getList();
-    }
-
-    /**
-     * Get detail information of data sink.
-     */
-    public StreamSink getSinkInfo(Integer sinkId) {
-        Response<StreamSink> response = 
executeHttpCall(streamSinkApi.getSinkInfo(sinkId));
-        assertRespSuccess(response);
-        return response.getData();
-    }
-
-    /**
-     * Update the stream sink info.
-     */
-    public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
-        Response<Boolean> responseBody = 
executeHttpCall(streamSinkApi.updateSink(sinkRequest));
-        assertRespSuccess(responseBody);
-
-        if (responseBody.getData() != null) {
-            return Pair.of(responseBody.getData(), responseBody.getErrMsg());
-        } else {
-            return Pair.of(false, responseBody.getErrMsg());
-        }
-    }
-
-    public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) {
-        Response<WorkflowResult> responseBody = executeHttpCall(
-                inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId()));
-        assertRespSuccess(responseBody);
-        return responseBody.getData();
-    }
-
-    public WorkflowResult startInlongGroup(int taskId, NewGroupProcessForm 
newGroupProcessForm) {
-        ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
-        workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
-        workflowTaskOperation.put("remark", "approved by system");
-
-        ObjectNode inlongGroupApproveForm = objectMapper.createObjectNode();
-        inlongGroupApproveForm.putPOJO("groupApproveInfo", 
newGroupProcessForm.getGroupInfo());
-        inlongGroupApproveForm.putPOJO("streamApproveInfoList", 
newGroupProcessForm.getStreamInfoList());
-        inlongGroupApproveForm.put("formName", "InlongGroupApproveForm");
-        workflowTaskOperation.set("form", inlongGroupApproveForm);
-
-        log.info("startInlongGroup workflowTaskOperation: {}", 
inlongGroupApproveForm);
-
-        Map<String, Object> requestMap = 
JsonUtils.OBJECT_MAPPER.convertValue(workflowTaskOperation,
-                new TypeReference<Map<String, Object>>() {
-                });
-        Response<WorkflowResult> response = 
executeHttpCall(workflowApi.startInlongGroup(taskId, requestMap));
-        assertRespSuccess(response);
-
-        return response.getData();
-    }
-
-    public boolean operateInlongGroup(String groupId, SimpleGroupStatus 
status) {
-        return operateInlongGroup(groupId, status, false);
-    }
-
-    public boolean operateInlongGroup(String groupId, SimpleGroupStatus 
status, boolean async) {
-        Call<Response<String>> responseCall;
-        if (status == SimpleGroupStatus.STOPPED) {
-            if (async) {
-                responseCall = inlongGroupApi.suspendProcessAsync(groupId);
-            } else {
-                responseCall = inlongGroupApi.suspendProcess(groupId);
-            }
-        } else if (status == SimpleGroupStatus.STARTED) {
-            if (async) {
-                responseCall = inlongGroupApi.restartProcessAsync(groupId);
-            } else {
-                responseCall = inlongGroupApi.restartProcess(groupId);
-            }
-        } else {
-            throw new IllegalArgumentException(String.format("Unsupported 
inlong group status: %s", status));
-        }
-
-        Response<String> responseBody = executeHttpCall(responseCall);
-
-        String errMsg = responseBody.getErrMsg();
-        return responseBody.isSuccess()
-                || errMsg == null
-                || !errMsg.contains("not allowed");
-    }
-
-    public boolean deleteInlongGroup(String groupId) {
-        return deleteInlongGroup(groupId, false);
-    }
-
-    public boolean deleteInlongGroup(String groupId, boolean async) {
-        if (async) {
-            Response<String> response = 
executeHttpCall(inlongGroupApi.deleteGroupAsync(groupId));
-            assertRespSuccess(response);
-            return groupId.equals(response.getData());
-        } else {
-            Response<Boolean> response = 
executeHttpCall(inlongGroupApi.deleteGroup(groupId));
-            assertRespSuccess(response);
-            return response.getData();
-        }
-    }
-
-    /**
-     * get inlong group error messages
-     */
-    public List<EventLogView> getInlongGroupError(String inlongGroupId) {
-        Response<PageInfo<EventLogView>> response = 
executeHttpCall(workflowApi.getInlongGroupError(inlongGroupId, -1));
-        assertRespSuccess(response);
-        return response.getData().getList();
-    }
-
-    /**
-     * get inlong group error messages
-     */
-    public List<InlongStreamConfigLogListResponse> getStreamLogs(String 
inlongGroupId, String inlongStreamId) {
-        Response<PageInfo<InlongStreamConfigLogListResponse>> response = 
executeHttpCall(
-                inlongStreamApi.getStreamLogs(inlongGroupId, inlongStreamId));
-        assertRespSuccess(response);
-        return response.getData().getList();
-    }
-
-    private <T> T executeHttpCall(Call<T> call) {
-        Request request = call.request();
-        String url = request.url().encodedPath();
-        try {
-            retrofit2.Response<T> response = call.execute();
-            Preconditions.checkTrue(response.isSuccessful(),
-                    String.format(REQUEST_FAILED_MSG, url, 
response.message()));
-            return response.body();
-        } catch (IOException e) {
-            log.error(String.format(REQUEST_FAILED_MSG, url, e.getMessage()), 
e);
-            throw new RuntimeException(String.format(REQUEST_FAILED_MSG, url, 
e.getMessage()), e);
-        }
-    }
-
-    private void assertRespSuccess(Response<?> response) {
-        Preconditions.checkTrue(response.isSuccess(), 
String.format(REQUEST_FAILED_MSG, response.getErrMsg(), null));
-    }
-
-}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
new file mode 100644
index 000000000..9ac932fe1
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import lombok.Getter;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+
+/**
+ * Factory for {@link org.apache.inlong.manager.client.api.inner.client}.
+ */
+@Getter
+public class ClientFactory {
+
+    private final InlongGroupClient groupClient;
+
+    private final InlongStreamClient streamClient;
+
+    private final StreamSinkClient sinkClient;
+
+    private final StreamSourceClient sourceClient;
+
+    private final InlongClusterClient clusterClient;
+
+    private final StreamTransformClient transformClient;
+
+    private final WorkflowClient workflowClient;
+
+    public ClientFactory(ClientConfiguration configuration) {
+        groupClient = new InlongGroupClient(configuration);
+        streamClient = new InlongStreamClient(configuration);
+        sourceClient = new StreamSourceClient(configuration);
+        sinkClient = new StreamSinkClient(configuration);
+        clusterClient = new InlongClusterClient(configuration);
+        transformClient = new StreamTransformClient(configuration);
+        workflowClient = new WorkflowClient(configuration);
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
new file mode 100644
index 000000000..1e5e976be
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.InlongClusterApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+/**
+ * Client for {@link InlongClusterApi}.
+ */
+public class InlongClusterClient {
+
+    private final InlongClusterApi inlongClusterApi;
+
+    public InlongClusterClient(ClientConfiguration configuration) {
+        inlongClusterApi = 
ClientUtils.createRetrofit(configuration).create(InlongClusterApi.class);
+    }
+
+    /**
+     * Save component cluster for Inlong
+     *
+     * @param request cluster create request
+     * @return clusterIndex
+     */
+    public Integer saveCluster(ClusterRequest request) {
+        Preconditions.checkNotEmpty(request.getName(), "cluster name should 
not be empty");
+        Preconditions.checkNotEmpty(request.getType(), "cluster type should 
not be empty");
+        Preconditions.checkNotEmpty(request.getClusterTags(), "cluster tags 
should not be empty");
+        Response<Integer> clusterIndexResponse = 
ClientUtils.executeHttpCall(inlongClusterApi.save(request));
+        ClientUtils.assertRespSuccess(clusterIndexResponse);
+        return clusterIndexResponse.getData();
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
new file mode 100644
index 000000000..0aff6ef4b
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongGroupClient.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.github.pagehelper.PageInfo;
+import lombok.SneakyThrows;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.client.api.service.InlongGroupApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
+import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.springframework.boot.configurationprocessor.json.JSONObject;
+import retrofit2.Call;
+
+import static 
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD;
+import static 
org.apache.inlong.manager.client.api.impl.InlongGroupImpl.MQ_FIELD_OLD;
+
+/**
+ * Client for {@link InlongGroupApi}.
+ */
+public class InlongGroupClient {
+
+    private final InlongGroupApi inlongGroupApi;
+
+    public InlongGroupClient(ClientConfiguration configuration) {
+        inlongGroupApi = 
ClientUtils.createRetrofit(configuration).create(InlongGroupApi.class);
+    }
+
+    /**
+     * Check whether a group exists based on the group ID.
+     */
+    public Boolean isGroupExists(String inlongGroupId) {
+        Preconditions.checkNotEmpty(inlongGroupId, "InlongGroupId should not 
be empty");
+
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(inlongGroupApi.isGroupExists(inlongGroupId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Get inlong group by the given inlong group id.
+     *
+     * @param inlongGroupId the given inlong group id
+     * @return inlong group info if exists, null will be returned if not exits
+     */
+    public InlongGroupInfo getGroupIfExists(String inlongGroupId) {
+        if (this.isGroupExists(inlongGroupId)) {
+            return getGroupInfo(inlongGroupId);
+        }
+        return null;
+    }
+
+    /**
+     * Get info of group.
+     */
+    @SneakyThrows
+    public InlongGroupInfo getGroupInfo(String inlongGroupId) {
+        Preconditions.checkNotEmpty(inlongGroupId, "InlongGroupId should not 
be empty");
+
+        Response<Object> responseBody = 
ClientUtils.executeHttpCall(inlongGroupApi.getGroupInfo(inlongGroupId));
+        if (responseBody.isSuccess()) {
+            JSONObject groupInfoJson = JsonUtils.parseObject(
+                    
JsonUtils.toJsonString(JsonUtils.toJsonString(responseBody.getData())),
+                    JSONObject.class);
+            if (groupInfoJson.has(MQ_FIELD_OLD) && 
!groupInfoJson.has(MQ_FIELD)) {
+                groupInfoJson.put(MQ_FIELD, groupInfoJson.get(MQ_FIELD_OLD));
+            }
+            return JsonUtils.parseObject(groupInfoJson.toString(), 
InlongGroupInfo.class);
+        }
+
+        if (responseBody.getErrMsg().contains("not exist")) {
+            return null;
+        } else {
+            throw new RuntimeException(responseBody.getErrMsg());
+        }
+    }
+
+    /**
+     * Get inlong group list.
+     */
+    public PageInfo<InlongGroupListResponse> listGroups(String keyword, int 
status, int pageNum, int pageSize) {
+        InlongGroupPageRequest request = InlongGroupPageRequest.builder()
+                .keyword(keyword)
+                .status(status)
+                .build();
+        request.setPageNum(pageNum <= 0 ? 1 : pageNum);
+        request.setPageSize(pageSize);
+
+        Response<PageInfo<InlongGroupListResponse>> pageInfoResponse = 
ClientUtils.executeHttpCall(
+                inlongGroupApi.listGroups(request));
+
+        if (pageInfoResponse.isSuccess()) {
+            return pageInfoResponse.getData();
+        }
+        if (pageInfoResponse.getErrMsg().contains("not exist")) {
+            return null;
+        } else {
+            throw new RuntimeException(pageInfoResponse.getErrMsg());
+        }
+    }
+
+    /**
+     * List inlong group by the page request
+     *
+     * @param pageRequest The pageRequest
+     * @return Response encapsulate of inlong group list
+     */
+    public PageInfo<InlongGroupListResponse> listGroups(InlongGroupPageRequest 
pageRequest) {
+        Response<PageInfo<InlongGroupListResponse>> pageInfoResponse = 
ClientUtils.executeHttpCall(
+                inlongGroupApi.listGroups(pageRequest));
+        ClientUtils.assertRespSuccess(pageInfoResponse);
+        return pageInfoResponse.getData();
+    }
+
+    /**
+     * Create an inlong group
+     */
+    public String createGroup(InlongGroupRequest groupInfo) {
+        Response<String> response = 
ClientUtils.executeHttpCall(inlongGroupApi.createGroup(groupInfo));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Update inlong group info
+     *
+     * @return groupId && errMsg
+     */
+    public Pair<String, String> updateGroup(InlongGroupRequest groupRequest) {
+        Response<String> response = 
ClientUtils.executeHttpCall(inlongGroupApi.updateGroup(groupRequest));
+        return Pair.of(response.getData(), response.getErrMsg());
+    }
+
+    /**
+     * Reset inlong group info
+     */
+    public boolean resetGroup(InlongGroupResetRequest resetRequest) {
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(inlongGroupApi.resetGroup(resetRequest));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    public WorkflowResult initInlongGroup(InlongGroupRequest groupInfo) {
+        Response<WorkflowResult> responseBody = ClientUtils.executeHttpCall(
+                inlongGroupApi.initInlongGroup(groupInfo.getInlongGroupId()));
+        ClientUtils.assertRespSuccess(responseBody);
+        return responseBody.getData();
+    }
+
+    public boolean operateInlongGroup(String groupId, SimpleGroupStatus 
status) {
+        return operateInlongGroup(groupId, status, false);
+    }
+
+    public boolean operateInlongGroup(String groupId, SimpleGroupStatus 
status, boolean async) {
+        Call<Response<String>> responseCall;
+        if (status == SimpleGroupStatus.STOPPED) {
+            if (async) {
+                responseCall = inlongGroupApi.suspendProcessAsync(groupId);
+            } else {
+                responseCall = inlongGroupApi.suspendProcess(groupId);
+            }
+        } else if (status == SimpleGroupStatus.STARTED) {
+            if (async) {
+                responseCall = inlongGroupApi.restartProcessAsync(groupId);
+            } else {
+                responseCall = inlongGroupApi.restartProcess(groupId);
+            }
+        } else {
+            throw new IllegalArgumentException(String.format("Unsupported 
inlong group status: %s", status));
+        }
+
+        Response<String> responseBody = 
ClientUtils.executeHttpCall(responseCall);
+
+        String errMsg = responseBody.getErrMsg();
+        return responseBody.isSuccess()
+                || errMsg == null
+                || !errMsg.contains("not allowed");
+    }
+
+    public boolean deleteInlongGroup(String groupId) {
+        return deleteInlongGroup(groupId, false);
+    }
+
+    public boolean deleteInlongGroup(String groupId, boolean async) {
+        if (async) {
+            Response<String> response = 
ClientUtils.executeHttpCall(inlongGroupApi.deleteGroupAsync(groupId));
+            ClientUtils.assertRespSuccess(response);
+            return groupId.equals(response.getData());
+        } else {
+            Response<Boolean> response = 
ClientUtils.executeHttpCall(inlongGroupApi.deleteGroup(groupId));
+            ClientUtils.assertRespSuccess(response);
+            return response.getData();
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
new file mode 100644
index 000000000..0b661df85
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongStreamClient.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.InlongStreamApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import 
org.apache.inlong.manager.common.pojo.stream.InlongStreamConfigLogListResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamPageRequest;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Client for {@link InlongStreamApi}.
+ */
+public class InlongStreamClient {
+
+    private final InlongStreamApi inlongStreamApi;
+
+    public InlongStreamClient(ClientConfiguration configuration) {
+        inlongStreamApi = 
ClientUtils.createRetrofit(configuration).create(InlongStreamApi.class);
+    }
+
+    /**
+     * Create an inlong stream.
+     */
+    public Integer createStreamInfo(InlongStreamInfo streamInfo) {
+        Response<Integer> response = 
ClientUtils.executeHttpCall(inlongStreamApi.createStream(streamInfo));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    public Boolean isStreamExists(InlongStreamInfo streamInfo) {
+        final String groupId = streamInfo.getInlongGroupId();
+        final String streamId = streamInfo.getInlongStreamId();
+        Preconditions.checkNotEmpty(groupId, "InlongGroupId should not be 
empty");
+        Preconditions.checkNotEmpty(streamId, "InlongStreamId should not be 
empty");
+
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(inlongStreamApi.isStreamExists(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    public Pair<Boolean, String> updateStreamInfo(InlongStreamInfo streamInfo) 
{
+        Response<Boolean> resp = 
ClientUtils.executeHttpCall(inlongStreamApi.updateStream(streamInfo));
+
+        if (resp.getData() != null) {
+            return Pair.of(resp.getData(), resp.getErrMsg());
+        } else {
+            return Pair.of(false, resp.getErrMsg());
+        }
+    }
+
+    /**
+     * Get inlong stream by the given groupId and streamId.
+     */
+    public InlongStreamInfo getStreamInfo(String groupId, String streamId) {
+        Response<InlongStreamInfo> response = 
ClientUtils.executeHttpCall(inlongStreamApi.getStream(groupId, streamId));
+
+        if (response.isSuccess()) {
+            return response.getData();
+        }
+        if (response.getErrMsg().contains("not exist")) {
+            return null;
+        } else {
+            throw new RuntimeException(response.getErrMsg());
+        }
+    }
+
+    /**
+     * Get inlong stream info.
+     */
+    public List<InlongStreamInfo> listStreamInfo(String inlongGroupId) {
+        InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
+        pageRequest.setInlongGroupId(inlongGroupId);
+
+        Response<PageInfo<InlongStreamInfo>> response = 
ClientUtils.executeHttpCall(
+                inlongStreamApi.listStream(pageRequest));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData().getList();
+    }
+
+    /**
+     * get inlong group error messages
+     */
+    public List<InlongStreamConfigLogListResponse> getStreamLogs(String 
inlongGroupId, String inlongStreamId) {
+        Response<PageInfo<InlongStreamConfigLogListResponse>> response = 
ClientUtils.executeHttpCall(
+                inlongStreamApi.getStreamLogs(inlongGroupId, inlongStreamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData().getList();
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
new file mode 100644
index 000000000..96074baa9
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.StreamSinkApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.common.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Client for {@link StreamSinkApi}.
+ */
+public class StreamSinkClient {
+
+    private final StreamSinkApi streamSinkApi;
+
+    public StreamSinkClient(ClientConfiguration configuration) {
+        streamSinkApi = 
ClientUtils.createRetrofit(configuration).create(StreamSinkApi.class);
+    }
+
+    public Integer createSink(SinkRequest sinkRequest) {
+        Response<Integer> response = 
ClientUtils.executeHttpCall(streamSinkApi.createSink(sinkRequest));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Delete stream sink info by ID.
+     */
+    public boolean deleteSink(int id) {
+        Preconditions.checkTrue(id > 0, "sinkId is illegal");
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(streamSinkApi.deleteSink(id));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * List stream sinks by the given groupId and streamId.
+     */
+    public List<StreamSink> listSinks(String groupId, String streamId) {
+        return listSinks(groupId, streamId, null);
+    }
+
+    /**
+     * List stream sinks by the specified sink type.
+     */
+    public List<StreamSink> listSinks(String groupId, String streamId, String 
sinkType) {
+        Response<PageInfo<StreamSink>> response = ClientUtils.executeHttpCall(
+                streamSinkApi.listSinks(groupId, streamId, sinkType));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData().getList();
+    }
+
+    /**
+     * Update the stream sink info.
+     */
+    public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
+        Response<Boolean> responseBody = 
ClientUtils.executeHttpCall(streamSinkApi.updateSink(sinkRequest));
+        ClientUtils.assertRespSuccess(responseBody);
+
+        if (responseBody.getData() != null) {
+            return Pair.of(responseBody.getData(), responseBody.getErrMsg());
+        } else {
+            return Pair.of(false, responseBody.getErrMsg());
+        }
+    }
+
+    /**
+     * Get detail information of data sink.
+     */
+    public StreamSink getSinkInfo(Integer sinkId) {
+        Response<StreamSink> response = 
ClientUtils.executeHttpCall(streamSinkApi.getSinkInfo(sinkId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
new file mode 100644
index 000000000..0cddd86a7
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.github.pagehelper.PageInfo;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.StreamSourceApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.source.SourceRequest;
+import org.apache.inlong.manager.common.pojo.source.StreamSource;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Client for {@link StreamSourceApi}.
+ */
+public class StreamSourceClient {
+
+    private final StreamSourceApi streamSourceApi;
+
+    public StreamSourceClient(ClientConfiguration configuration) {
+        streamSourceApi = 
ClientUtils.createRetrofit(configuration).create(StreamSourceApi.class);
+    }
+
+    /**
+     * Create an inlong stream source.
+     */
+    public Integer createSource(SourceRequest request) {
+        Response<Integer> response = 
ClientUtils.executeHttpCall(streamSourceApi.createSource(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * List stream sources by the given groupId and streamId.
+     */
+    public List<StreamSource> listSources(String groupId, String streamId) {
+        return listSources(groupId, streamId, null);
+    }
+
+    /**
+     * List stream sources by the specified source type.
+     */
+    public List<StreamSource> listSources(String groupId, String streamId, 
String sourceType) {
+        Response<PageInfo<StreamSource>> response = 
ClientUtils.executeHttpCall(
+                streamSourceApi.listSources(groupId, streamId, sourceType));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData().getList();
+    }
+
+    /**
+     * Update the stream source info.
+     */
+    public Pair<Boolean, String> updateSource(SourceRequest request) {
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(streamSourceApi.updateSource(request));
+        if (response.getData() != null) {
+            return Pair.of(response.getData(), response.getErrMsg());
+        } else {
+            return Pair.of(false, response.getErrMsg());
+        }
+    }
+
+    /**
+     * Delete data source information by id.
+     */
+    public boolean deleteSource(int id) {
+        Preconditions.checkTrue(id > 0, "sourceId is illegal");
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(streamSourceApi.deleteSource(id));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
new file mode 100644
index 000000000..a2bd8fba9
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamTransformClient.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.StreamTransformApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
+import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.Preconditions;
+
+import java.util.List;
+
+/**
+ * Client for {@link StreamTransformApi}.
+ */
+public class StreamTransformClient {
+
+    private final StreamTransformApi streamTransformApi;
+
+    public StreamTransformClient(ClientConfiguration configuration) {
+        streamTransformApi = 
ClientUtils.createRetrofit(configuration).create(StreamTransformApi.class);
+    }
+
+    /**
+     * Create a conversion function info.
+     */
+    public Integer createTransform(TransformRequest transformRequest) {
+        Response<Integer> response = 
ClientUtils.executeHttpCall(streamTransformApi.createTransform(transformRequest));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Get all conversion function info.
+     */
+    public List<TransformResponse> listTransform(String groupId, String 
streamId) {
+        Response<List<TransformResponse>> response = 
ClientUtils.executeHttpCall(
+                streamTransformApi.listTransform(groupId, streamId));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Update conversion function info.
+     */
+    public Pair<Boolean, String> updateTransform(TransformRequest 
transformRequest) {
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(streamTransformApi.updateTransform(transformRequest));
+
+        if (response.getData() != null) {
+            return Pair.of(response.getData(), response.getErrMsg());
+        } else {
+            return Pair.of(false, response.getErrMsg());
+        }
+    }
+
+    /**
+     * Delete conversion function info.
+     */
+    public boolean deleteTransform(TransformRequest transformRequest) {
+        Preconditions.checkNotEmpty(transformRequest.getInlongGroupId(), 
"inlongGroupId should not be null");
+        Preconditions.checkNotEmpty(transformRequest.getInlongStreamId(), 
"inlongStreamId should not be null");
+        Preconditions.checkNotEmpty(transformRequest.getTransformName(), 
"transformName should not be null");
+
+        Response<Boolean> response = ClientUtils.executeHttpCall(
+                
streamTransformApi.deleteTransform(transformRequest.getInlongGroupId(),
+                        transformRequest.getInlongStreamId(), 
transformRequest.getTransformName()));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
new file mode 100644
index 000000000..82994129f
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.WorkflowApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
+import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import 
org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Client for {@link WorkflowApi}.
+ */
+@Slf4j
+public class WorkflowClient {
+
+    private final WorkflowApi workflowApi;
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    public WorkflowClient(ClientConfiguration configuration) {
+        workflowApi = 
ClientUtils.createRetrofit(configuration).create(WorkflowApi.class);
+    }
+
+    public WorkflowResult startInlongGroup(int taskId, NewGroupProcessForm 
newGroupProcessForm) {
+        ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
+        workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
+        workflowTaskOperation.put("remark", "approved by system");
+
+        ObjectNode inlongGroupApproveForm = objectMapper.createObjectNode();
+        inlongGroupApproveForm.putPOJO("groupApproveInfo", 
newGroupProcessForm.getGroupInfo());
+        inlongGroupApproveForm.putPOJO("streamApproveInfoList", 
newGroupProcessForm.getStreamInfoList());
+        inlongGroupApproveForm.put("formName", "InlongGroupApproveForm");
+        workflowTaskOperation.set("form", inlongGroupApproveForm);
+
+        log.info("startInlongGroup workflowTaskOperation: {}", 
inlongGroupApproveForm);
+
+        Map<String, Object> requestMap = 
JsonUtils.OBJECT_MAPPER.convertValue(workflowTaskOperation,
+                new TypeReference<Map<String, Object>>() {
+                });
+        Response<WorkflowResult> response = ClientUtils.executeHttpCall(
+                workflowApi.startInlongGroup(taskId, requestMap));
+        ClientUtils.assertRespSuccess(response);
+
+        return response.getData();
+    }
+
+    /**
+     * get inlong group error messages
+     */
+    public List<EventLogView> getInlongGroupError(String inlongGroupId) {
+        Response<PageInfo<EventLogView>> response = 
ClientUtils.executeHttpCall(
+                workflowApi.getInlongGroupError(inlongGroupId, -1));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData().getList();
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/ClientUtils.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/ClientUtils.java
new file mode 100644
index 000000000..6aaed4be2
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/ClientUtils.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.util;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.service.AuthInterceptor;
+import org.apache.inlong.manager.common.auth.Authentication;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import retrofit2.Call;
+import retrofit2.Retrofit;
+import retrofit2.converter.jackson.JacksonConverterFactory;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Utils for client
+ */
+@Slf4j
+@UtilityClass
+public class ClientUtils {
+
+    private static final String REQUEST_FAILED_MSG = "Request to Inlong %s 
failed: %s";
+
+    private static ClientFactory clientFactory;
+
+    /**
+     * Get factory for {@link 
org.apache.inlong.manager.client.api.inner.client}.
+     *
+     * @param configuration client configuration
+     * @return ClientFactory
+     */
+    public static ClientFactory getClientFactory(ClientConfiguration 
configuration) {
+        return Optional.ofNullable(clientFactory).orElse(new 
ClientFactory(configuration));
+    }
+
+    /**
+     * Get retrofit to instantiate Client API.
+     *
+     * @param configuration client configuration
+     * @return Retrofit
+     */
+    public static Retrofit createRetrofit(ClientConfiguration configuration) {
+        String host = configuration.getBindHost();
+        int port = configuration.getBindPort();
+
+        Authentication authentication = configuration.getAuthentication();
+        Preconditions.checkNotNull(authentication, "inlong should be 
authenticated");
+        Preconditions.checkTrue(authentication instanceof 
DefaultAuthentication,
+                "inlong only support default authentication");
+        DefaultAuthentication defaultAuthentication = (DefaultAuthentication) 
authentication;
+
+        OkHttpClient okHttpClient = new OkHttpClient.Builder()
+                .addInterceptor(
+                        new 
AuthInterceptor(defaultAuthentication.getUsername(), 
defaultAuthentication.getPassword()))
+                .connectTimeout(configuration.getConnectTimeout(), 
configuration.getTimeUnit())
+                .readTimeout(configuration.getReadTimeout(), 
configuration.getTimeUnit())
+                .writeTimeout(configuration.getWriteTimeout(), 
configuration.getTimeUnit())
+                .retryOnConnectionFailure(true)
+                .build();
+
+        return new Retrofit.Builder()
+                .baseUrl("http://"; + host + ":" + port + 
"/api/inlong/manager/")
+                
.addConverterFactory(JacksonConverterFactory.create(JsonUtils.OBJECT_MAPPER))
+                .client(okHttpClient)
+                .build();
+    }
+
+    /**
+     * Send http request.
+     *
+     * @param call http request
+     * @param <T> T
+     * @return T
+     */
+    public static <T> T executeHttpCall(Call<T> call) {
+        Request request = call.request();
+        String url = request.url().encodedPath();
+        try {
+            retrofit2.Response<T> response = call.execute();
+            Preconditions.checkTrue(response.isSuccessful(),
+                    String.format(REQUEST_FAILED_MSG, url, 
response.message()));
+            return response.body();
+        } catch (IOException e) {
+            log.error(String.format(REQUEST_FAILED_MSG, url, e.getMessage()), 
e);
+            throw new RuntimeException(String.format(REQUEST_FAILED_MSG, url, 
e.getMessage()), e);
+        }
+    }
+
+    /**
+     * Assert if the response is successful.
+     *
+     * @param response response
+     */
+    public static void assertRespSuccess(Response<?> response) {
+        Preconditions.checkTrue(response.isSuccess(), 
String.format(REQUEST_FAILED_MSG, response.getErrMsg(), null));
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
similarity index 91%
rename from 
inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
rename to 
inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 7d8295d6c..de019484d 100644
--- 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClientTest.java
+++ 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -26,6 +26,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.client.api.ClientConfiguration;
 import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
+import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.auth.DefaultAuthentication;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.SinkType;
@@ -73,14 +79,17 @@ import static 
com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
 import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
 
 /**
- * Unit test for {@link InnerInlongManagerClient}.
+ * Unit test for {@link ClientFactory}.
  */
 @Slf4j
-class InnerInlongManagerClientTest {
+class ClientFactoryTest {
 
     private static final int SERVICE_PORT = 8085;
     private static WireMockServer wireMockServer;
-    private static InnerInlongManagerClient innerInlongManagerClient;
+    private static InlongGroupClient groupClient;
+    private static InlongStreamClient streamClient;
+    private static StreamSinkClient sinkClient;
+    private static InlongClusterClient clusterClient;
 
     @BeforeAll
     static void setup() {
@@ -92,7 +101,12 @@ class InnerInlongManagerClientTest {
         ClientConfiguration configuration = new ClientConfiguration();
         configuration.setAuthentication(new DefaultAuthentication("admin", 
"inlong"));
         InlongClientImpl inlongClient = new InlongClientImpl(serviceUrl, 
configuration);
-        innerInlongManagerClient = new 
InnerInlongManagerClient(inlongClient.getConfiguration());
+        ClientFactory clientFactory = 
ClientUtils.getClientFactory(inlongClient.getConfiguration());
+        groupClient = clientFactory.getGroupClient();
+        streamClient = clientFactory.getStreamClient();
+        sinkClient = clientFactory.getSinkClient();
+        streamClient = clientFactory.getStreamClient();
+        clusterClient = clientFactory.getClusterClient();
     }
 
     @AfterAll
@@ -108,7 +122,7 @@ class InnerInlongManagerClientTest {
                                 
okJson(JsonUtils.toJsonString(Response.success(true)))
                         )
         );
-        Boolean groupExists = innerInlongManagerClient.isGroupExists("123");
+        Boolean groupExists = groupClient.isGroupExists("123");
         Assertions.assertTrue(groupExists);
     }
 
@@ -136,7 +150,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        InlongGroupInfo groupInfo = innerInlongManagerClient.getGroupInfo("1");
+        InlongGroupInfo groupInfo = groupClient.getGroupInfo("1");
         Assertions.assertTrue(groupInfo instanceof InlongPulsarInfo);
         Assertions.assertEquals(JsonUtils.toJsonString(inlongGroupResponse), 
JsonUtils.toJsonString(groupInfo));
     }
@@ -168,7 +182,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        PageInfo<InlongGroupListResponse> listResponse = 
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+        PageInfo<InlongGroupListResponse> listResponse = 
groupClient.listGroups("keyword", 1, 1, 10);
         Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
                 JsonUtils.toJsonString(listResponse.getList()));
     }
@@ -204,7 +218,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        PageInfo<InlongGroupListResponse> listResponse = 
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+        PageInfo<InlongGroupListResponse> listResponse = 
groupClient.listGroups("keyword", 1, 1, 10);
         Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
                 JsonUtils.toJsonString(listResponse.getList()));
     }
@@ -242,7 +256,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        PageInfo<InlongGroupListResponse> listResponse = 
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+        PageInfo<InlongGroupListResponse> listResponse = 
groupClient.listGroups("keyword", 1, 1, 10);
         Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
                 JsonUtils.toJsonString(listResponse.getList()));
     }
@@ -282,7 +296,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        PageInfo<InlongGroupListResponse> listResponse = 
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+        PageInfo<InlongGroupListResponse> listResponse = 
groupClient.listGroups("keyword", 1, 1, 10);
         Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
                 JsonUtils.toJsonString(listResponse.getList()));
     }
@@ -354,7 +368,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        PageInfo<InlongGroupListResponse> listResponse = 
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+        PageInfo<InlongGroupListResponse> listResponse = 
groupClient.listGroups("keyword", 1, 1, 10);
         Assertions.assertEquals(JsonUtils.toJsonString(groupListResponses),
                 JsonUtils.toJsonString(listResponse.getList()));
     }
@@ -370,7 +384,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        PageInfo<InlongGroupListResponse> listResponse = 
innerInlongManagerClient.listGroups("keyword", 1, 1, 10);
+        PageInfo<InlongGroupListResponse> listResponse = 
groupClient.listGroups("keyword", 1, 1, 10);
         Assertions.assertNull(listResponse);
     }
 
@@ -383,7 +397,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        String groupId = innerInlongManagerClient.createGroup(new 
InlongPulsarRequest());
+        String groupId = groupClient.createGroup(new InlongPulsarRequest());
         Assertions.assertEquals("1111", groupId);
     }
 
@@ -396,7 +410,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        Pair<String, String> updateGroup = 
innerInlongManagerClient.updateGroup(new InlongPulsarRequest());
+        Pair<String, String> updateGroup = groupClient.updateGroup(new 
InlongPulsarRequest());
         Assertions.assertEquals("1111", updateGroup.getKey());
         Assertions.assertTrue(StringUtils.isBlank(updateGroup.getValue()));
     }
@@ -410,7 +424,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        Integer groupId = innerInlongManagerClient.createStreamInfo(new 
InlongStreamInfo());
+        Integer groupId = streamClient.createStreamInfo(new 
InlongStreamInfo());
         Assertions.assertEquals(11, groupId);
     }
 
@@ -426,7 +440,7 @@ class InnerInlongManagerClientTest {
         InlongStreamInfo streamInfo = new InlongStreamInfo();
         streamInfo.setInlongGroupId("123");
         streamInfo.setInlongStreamId("11");
-        Boolean groupExists = 
innerInlongManagerClient.isStreamExists(streamInfo);
+        Boolean groupExists = streamClient.isStreamExists(streamInfo);
 
         Assertions.assertTrue(groupExists);
     }
@@ -461,7 +475,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        InlongStreamInfo inlongStreamInfo = 
innerInlongManagerClient.getStreamInfo("123", "11");
+        InlongStreamInfo inlongStreamInfo = streamClient.getStreamInfo("123", 
"11");
         Assertions.assertNotNull(inlongStreamInfo);
     }
 
@@ -475,7 +489,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        InlongStreamInfo inlongStreamInfo = 
innerInlongManagerClient.getStreamInfo("123", "11");
+        InlongStreamInfo inlongStreamInfo = streamClient.getStreamInfo("123", 
"11");
         Assertions.assertNull(inlongStreamInfo);
     }
 
@@ -565,7 +579,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        List<InlongStreamInfo> streamInfos = 
innerInlongManagerClient.listStreamInfo("11");
+        List<InlongStreamInfo> streamInfos = streamClient.listStreamInfo("11");
         Assertions.assertEquals(JsonUtils.toJsonString(streamInfo), 
JsonUtils.toJsonString(streamInfos.get(0)));
     }
 
@@ -624,7 +638,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        List<StreamSink> sinks = innerInlongManagerClient.listSinks("11", 
"11");
+        List<StreamSink> sinks = sinkClient.listSinks("11", "11");
         Assertions.assertEquals(JsonUtils.toJsonString(sinkList), 
JsonUtils.toJsonString(sinks));
     }
 
@@ -640,7 +654,7 @@ class InnerInlongManagerClientTest {
         );
 
         RuntimeException exception = 
Assertions.assertThrows(IllegalArgumentException.class,
-                () -> innerInlongManagerClient.listSinks("", "11"));
+                () -> sinkClient.listSinks("", "11"));
         Assertions.assertTrue(exception.getMessage().contains("groupId should 
not empty"));
     }
 
@@ -655,7 +669,7 @@ class InnerInlongManagerClientTest {
                         )
         );
 
-        boolean isReset = innerInlongManagerClient.resetGroup(new 
InlongGroupResetRequest());
+        boolean isReset = groupClient.resetGroup(new 
InlongGroupResetRequest());
         Assertions.assertTrue(isReset);
     }
 
@@ -672,7 +686,7 @@ class InnerInlongManagerClientTest {
         ClusterRequest request = new PulsarClusterRequest();
         request.setName("pulsar");
         request.setClusterTags("test_cluster");
-        Integer clusterIndex = innerInlongManagerClient.saveCluster(request);
+        Integer clusterIndex = clusterClient.saveCluster(request);
         Assertions.assertEquals(1, (int) clusterIndex);
     }
 
@@ -710,7 +724,7 @@ class InnerInlongManagerClientTest {
                                 ))
         );
 
-        StreamSink sinkInfo = innerInlongManagerClient.getSinkInfo(1);
+        StreamSink sinkInfo = sinkClient.getSinkInfo(1);
         Assertions.assertEquals(1, sinkInfo.getId());
         Assertions.assertTrue(sinkInfo instanceof MySQLSink);
     }

Reply via email to