This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 82ac42726 [INLONG-4361][Manager] Refactor the manager client module
(#4366)
82ac42726 is described below
commit 82ac4272662917d4c5b21d5882a728e70b00275d
Author: healzhou <[email protected]>
AuthorDate: Wed May 25 10:32:17 2022 +0800
[INLONG-4361][Manager] Refactor the manager client module (#4366)
---
.../manager/client/AutoPush2HiveExample.java | 2 +-
.../inlong/manager/client/Binlog2KafkaExample.java | 2 +-
.../inlong/manager/client/File2HiveExample.java | 2 +-
.../inlong/manager/client/Kafka2HiveExample.java | 2 +-
.../cli/{CommandBase.java => AbstractCommand.java} | 11 ++-
...leConverter.java => AbstractCommandRunner.java} | 20 +++---
.../inlong/manager/client/cli/CommandToolMain.java | 8 +--
.../cli/{CommandCreate.java => CreateCommand.java} | 40 ++++++-----
.../{CommandDescribe.java => DescribeCommand.java} | 63 +++++++++++++----
.../cli/{CommandList.java => ListCommand.java} | 71 ++++++++++++++-----
.../client/cli/{ => pojo}/CreateGroupConf.java | 2 +-
.../manager/client/cli/util/CharsetAdapter.java | 5 +-
.../{CommandUtil.java => util/ClientUtils.java} | 35 ++++-----
.../cli/util/{GsonUtil.java => GsonUtils.java} | 25 ++++---
.../client/cli/util/InlongGroupInfoAdapter.java | 2 +-
.../manager/client/cli/util/SeparatorAdapter.java | 4 +-
.../client/cli/util/SortBaseConfAdapter.java | 12 ++--
.../manager/client/cli/util/StreamSinkAdapter.java | 13 ++--
.../client/cli/util/StreamSourceAdapter.java | 14 ++--
.../inlong/manager/client/cli/TestCommand.java | 7 +-
.../manager/client/api/InlongGroupContext.java | 4 +-
.../manager/client/api/InlongStreamConf.java | 1 +
.../api/impl/DefaultInlongStreamBuilder.java | 6 +-
.../manager/client/api/impl/InlongGroupImpl.java | 6 +-
.../manager/client/api/impl/InlongStreamImpl.java | 8 +--
.../client/api/inner/InnerInlongManagerClient.java | 26 +++----
.../inlong/manager/client/api/sink/HiveSink.java | 2 +-
.../api/util/{GsonUtil.java => GsonUtils.java} | 58 +++++++--------
.../manager/client/api/util/InlongParser.java | 82 +++++++++++-----------
.../client/api/util/InlongStreamSinkTransfer.java | 2 +-
.../api/util/InlongStreamTransformTransfer.java | 2 +-
.../client/api/impl/InlongStreamImplTest.java | 4 +-
.../manager/common/enums}/DataSeparator.java | 30 ++++----
.../common/pojo/stream/InlongStreamRequest.java | 8 ++-
34 files changed, 324 insertions(+), 255 deletions(-)
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
index ad86547be..da81ed118 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/AutoPush2HiveExample.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.client;
import com.google.common.collect.Lists;
import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
index 99d3a6942..63eeb3c38 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Binlog2KafkaExample.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.client;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
index 3db7dabcc..b0449d333 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/File2HiveExample.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
index 12c2b6010..9ca34ac1b 100644
---
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/Kafka2HiveExample.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongGroupContext;
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandBase.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommand.java
similarity index 87%
rename from
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandBase.java
rename to
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommand.java
index 572eca597..7abec129e 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandBase.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommand.java
@@ -24,14 +24,14 @@ import com.beust.jcommander.ParameterException;
/**
* Class for parse command.
*/
-abstract class CommandBase {
+public abstract class AbstractCommand {
protected final JCommander jcommander;
@Parameter(names = {"-h", "--help"}, help = true, hidden = true)
private boolean help;
- public CommandBase(String cmdName) {
+ public AbstractCommand(String cmdName) {
jcommander = new JCommander();
jcommander.setProgramName("managerctl " + cmdName);
}
@@ -57,13 +57,12 @@ abstract class CommandBase {
return false;
} else {
JCommander obj = jcommander.getCommands().get(cmd);
- CommandUtil cmdObj = (CommandUtil) obj.getObjects().get(0);
+ AbstractCommandRunner commandRunner = (AbstractCommandRunner)
obj.getObjects().get(0);
try {
- cmdObj.run();
+ commandRunner.run();
return true;
} catch (ParameterException e) {
- System.err.println(e.getMessage());
- System.err.println();
+ System.err.println(e.getMessage() + System.lineSeparator());
return false;
} catch (Exception e) {
e.printStackTrace();
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/converter/FileConverter.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommandRunner.java
similarity index 73%
rename from
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/converter/FileConverter.java
rename to
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommandRunner.java
index 906a87cff..aff34302d 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/converter/FileConverter.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/AbstractCommandRunner.java
@@ -15,19 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.cli.converter;
-
-import com.beust.jcommander.IStringConverter;
-
-import java.io.File;
+package org.apache.inlong.manager.client.cli;
/**
- * File converter.
+ * The runner of command.
+ * of command for creat connect by config file.
*/
-public class FileConverter implements IStringConverter<File> {
+public abstract class AbstractCommandRunner {
+
+ /**
+ * Execute the specified command.
+ */
+ abstract void run() throws Exception;
- @Override
- public File convert(String value) {
- return new File(value);
- }
}
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
index bc07c8546..04fc0b9a3 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandToolMain.java
@@ -40,9 +40,9 @@ public class CommandToolMain {
jcommander.setProgramName("managerctl");
jcommander.addObject(this);
- commandMap.put("list", CommandList.class);
- commandMap.put("describe", CommandDescribe.class);
- commandMap.put("create", CommandCreate.class);
+ commandMap.put("list", ListCommand.class);
+ commandMap.put("describe", DescribeCommand.class);
+ commandMap.put("create", CreateCommand.class);
for (Map.Entry<String, Class<?>> cmd : commandMap.entrySet()) {
try {
@@ -78,7 +78,7 @@ public class CommandToolMain {
String cmd = args[0];
JCommander obj = jcommander.getCommands().get(cmd);
- CommandBase cmdObj = (CommandBase) obj.getObjects().get(0);
+ AbstractCommand cmdObj = (AbstractCommand) obj.getObjects().get(0);
return cmdObj.run(Arrays.copyOfRange(args, 1, args.length));
}
}
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandCreate.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
similarity index 59%
rename from
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandCreate.java
rename to
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 0a90441f5..f5e42798a 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandCreate.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -20,9 +20,13 @@ package org.apache.inlong.manager.client.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.beust.jcommander.converters.FileConverter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.InlongClient;
import org.apache.inlong.manager.client.api.InlongGroup;
import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.client.cli.pojo.CreateGroupConf;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.client.cli.util.GsonUtils;
import java.io.File;
@@ -30,18 +34,18 @@ import java.io.File;
* Create resource by json file.
*/
@Parameters(commandDescription = "Create resource by json file")
-public class CommandCreate extends CommandBase {
+public class CreateCommand extends AbstractCommand {
@Parameter()
private java.util.List<String> params;
- public CommandCreate() {
+ public CreateCommand() {
super("create");
- jcommander.addCommand("group", new CommandCreate.CreateGroup());
+ jcommander.addCommand("group", new CreateGroup());
}
@Parameters(commandDescription = "Create group by json file")
- private class CreateGroup extends CommandUtil {
+ private static class CreateGroup extends AbstractCommandRunner {
@Parameter()
private java.util.List<String> params;
@@ -55,20 +59,22 @@ public class CommandCreate extends CommandBase {
@Override
void run() {
try {
- String jsonFile = readFile(file);
- if (!jsonFile.isEmpty()) {
- CreateGroupConf groupConf = jsonToObject(jsonFile);
-
- InlongClient inlongClient = connect();
- InlongGroup group =
inlongClient.forGroup(groupConf.getGroupInfo());
- InlongStreamBuilder streamBuilder =
group.createStream(groupConf.getStreamConf());
- streamBuilder.fields(groupConf.getStreamFieldList());
- streamBuilder.source(groupConf.getStreamSource());
- streamBuilder.sink(groupConf.getStreamSink());
- streamBuilder.initOrUpdate();
- group.init();
- System.out.println("Create group success!");
+ String fileContent = ClientUtils.readFile(file);
+ if (StringUtils.isBlank(fileContent)) {
+ System.out.println("Create group failed: file was empty!");
+ return;
}
+
+ CreateGroupConf groupConf =
GsonUtils.GSON.fromJson(fileContent, CreateGroupConf.class);
+ InlongClient inlongClient = ClientUtils.getClient();
+ InlongGroup group =
inlongClient.forGroup(groupConf.getGroupInfo());
+ InlongStreamBuilder streamBuilder =
group.createStream(groupConf.getStreamConf());
+ streamBuilder.fields(groupConf.getStreamFieldList());
+ streamBuilder.source(groupConf.getStreamSource());
+ streamBuilder.sink(groupConf.getStreamSink());
+ streamBuilder.initOrUpdate();
+ group.init();
+ System.out.println("Create group success!");
} catch (Exception e) {
System.out.println("Create group failed!");
System.out.println(e.getMessage());
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandDescribe.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
similarity index 69%
rename from
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandDescribe.java
rename to
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
index 6390e7a16..9c38564cb 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandDescribe.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DescribeCommand.java
@@ -20,34 +20,49 @@ package org.apache.inlong.manager.client.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
import org.apache.inlong.manager.client.cli.util.PrintUtils;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
import org.apache.inlong.manager.common.pojo.sink.SinkListResponse;
import org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
+import java.io.IOException;
import java.util.List;
/**
- * Get commond describe info of resources.
+ * Describe the info of resources.
*/
@Parameters(commandDescription = "Display details of one or more resources")
-public class CommandDescribe extends CommandBase {
+public class DescribeCommand extends AbstractCommand {
@Parameter()
private java.util.List<String> params;
- public CommandDescribe() {
+ public DescribeCommand() {
super("describe");
- jcommander.addCommand("stream", new DescribeStream());
- jcommander.addCommand("group", new DescribeGroup());
- jcommander.addCommand("sink", new DescribeSink());
- jcommander.addCommand("source", new DescribeSource());
+ InlongClientImpl inlongClient;
+ try {
+ inlongClient = ClientUtils.getClient();
+ } catch (IOException e) {
+ System.err.println("get inlong client error");
+ System.err.println(e.getMessage());
+ return;
+ }
+ InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(inlongClient.getConfiguration());
+
+ jcommander.addCommand("stream", new DescribeStream(managerClient));
+ jcommander.addCommand("group", new DescribeGroup(managerClient));
+ jcommander.addCommand("sink", new DescribeSink(managerClient));
+ jcommander.addCommand("source", new DescribeSource(managerClient));
}
@Parameters(commandDescription = "Get stream details")
- private class DescribeStream extends CommandUtil {
+ private static class DescribeStream extends AbstractCommandRunner {
+
+ private final InnerInlongManagerClient managerClient;
@Parameter()
private java.util.List<String> params;
@@ -55,10 +70,13 @@ public class CommandDescribe extends CommandBase {
@Parameter(names = {"-g", "--group"}, required = true, description =
"inlong group id")
private String groupId;
+ DescribeStream(InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ }
+
@Override
void run() {
try {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(connect().getConfiguration());
List<FullStreamResponse> fullStreamResponseList =
managerClient.listStreamInfo(groupId);
fullStreamResponseList.forEach(response ->
PrintUtils.printJson(response.getStreamInfo()));
} catch (Exception e) {
@@ -68,7 +86,9 @@ public class CommandDescribe extends CommandBase {
}
@Parameters(commandDescription = "Get group details")
- private class DescribeGroup extends CommandUtil {
+ private static class DescribeGroup extends AbstractCommandRunner {
+
+ private final InnerInlongManagerClient managerClient;
@Parameter()
private java.util.List<String> params;
@@ -82,10 +102,13 @@ public class CommandDescribe extends CommandBase {
@Parameter(names = {"-n", "--num"}, description = "the number
displayed")
private int pageSize = 10;
+ DescribeGroup(InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ }
+
@Override
void run() {
try {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(connect().getConfiguration());
PageInfo<InlongGroupListResponse> groupPageInfo =
managerClient.listGroups(group, status, 1, pageSize);
groupPageInfo.getList().forEach(PrintUtils::printJson);
} catch (Exception e) {
@@ -95,7 +118,9 @@ public class CommandDescribe extends CommandBase {
}
@Parameters(commandDescription = "Get sink details")
- private class DescribeSink extends CommandUtil {
+ private static class DescribeSink extends AbstractCommandRunner {
+
+ private final InnerInlongManagerClient managerClient;
@Parameter()
private java.util.List<String> params;
@@ -106,9 +131,12 @@ public class CommandDescribe extends CommandBase {
@Parameter(names = {"-g", "--group"}, required = true, description =
"inlong group id")
private String group;
+ DescribeSink(InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ }
+
@Override
void run() {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(connect().getConfiguration());
try {
List<SinkListResponse> sinkListResponses =
managerClient.listSinks(group, stream);
sinkListResponses.forEach(PrintUtils::printJson);
@@ -119,7 +147,9 @@ public class CommandDescribe extends CommandBase {
}
@Parameters(commandDescription = "Get source details")
- private class DescribeSource extends CommandUtil {
+ private static class DescribeSource extends AbstractCommandRunner {
+
+ private final InnerInlongManagerClient managerClient;
@Parameter()
private java.util.List<String> params;
@@ -133,10 +163,13 @@ public class CommandDescribe extends CommandBase {
@Parameter(names = {"-t", "--type"}, description = "sink type")
private String type;
+ DescribeSource(InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ }
+
@Override
void run() {
try {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(connect().getConfiguration());
List<SourceListResponse> sourceListResponses =
managerClient.listSources(group, stream, type);
sourceListResponses.forEach(PrintUtils::printJson);
} catch (Exception e) {
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandList.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
similarity index 73%
rename from
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandList.java
rename to
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
index 48b7afaed..ca3184d3a 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandList.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java
@@ -21,11 +21,13 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.github.pagehelper.PageInfo;
import
org.apache.inlong.manager.client.api.InlongGroupContext.InlongGroupStatus;
+import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.cli.pojo.GroupInfo;
import org.apache.inlong.manager.client.cli.pojo.SinkInfo;
import org.apache.inlong.manager.client.cli.pojo.SourceInfo;
import org.apache.inlong.manager.client.cli.pojo.StreamInfo;
+import org.apache.inlong.manager.client.cli.util.ClientUtils;
import org.apache.inlong.manager.client.cli.util.PrintUtils;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.group.InlongGroupListResponse;
@@ -35,28 +37,42 @@ import
org.apache.inlong.manager.common.pojo.source.SourceListResponse;
import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
- * Get main infomation of resources.
+ * Get main information of resources.
*/
@Parameters(commandDescription = "Displays main information for one or more
resources")
-public class CommandList extends CommandBase {
+public class ListCommand extends AbstractCommand {
@Parameter()
private List<String> params;
- public CommandList() {
+ public ListCommand() {
super("list");
- jcommander.addCommand("stream", new ListStream());
- jcommander.addCommand("group", new ListGroup());
- jcommander.addCommand("sink", new ListSink());
- jcommander.addCommand("source", new ListSource());
+ InlongClientImpl inlongClient;
+ try {
+ inlongClient = ClientUtils.getClient();
+ } catch (IOException e) {
+ System.err.println("get inlong client error");
+ System.err.println(e.getMessage());
+ return;
+ }
+
+ InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(inlongClient.getConfiguration());
+
+ jcommander.addCommand("stream", new ListStream(managerClient));
+ jcommander.addCommand("group", new ListGroup(managerClient));
+ jcommander.addCommand("sink", new ListSink(managerClient));
+ jcommander.addCommand("source", new ListSource(managerClient));
}
@Parameters(commandDescription = "Get stream main information")
- private static class ListStream extends CommandUtil {
+ private static class ListStream extends AbstractCommandRunner {
+
+ private final InnerInlongManagerClient managerClient;
@Parameter()
private List<String> params;
@@ -64,9 +80,12 @@ public class CommandList extends CommandBase {
@Parameter(names = {"-g", "--group"}, required = true, description =
"inlong group id")
private String groupId;
+ ListStream(InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ }
+
@Override
void run() {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(connect().getConfiguration());
try {
List<FullStreamResponse> fullStreamResponseList =
managerClient.listStreamInfo(groupId);
List<InlongStreamInfo> inlongStreamInfoList = new
ArrayList<>();
@@ -81,19 +100,28 @@ public class CommandList extends CommandBase {
}
@Parameters(commandDescription = "Get group details")
- private static class ListGroup extends CommandUtil {
+ private static class ListGroup extends AbstractCommandRunner {
private static final int DEFAULT_PAGE_SIZE = 10;
+ private final InnerInlongManagerClient managerClient;
+
@Parameter()
private List<String> params;
+
@Parameter(names = {"-s", "--status"})
private String status;
+
@Parameter(names = {"-g", "--group"}, description = "inlong group id")
private String group;
+
@Parameter(names = {"-n", "--num"}, description = "the number
displayed")
private int pageSize;
+ ListGroup(InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ }
+
@Override
void run() {
try {
@@ -105,11 +133,10 @@ public class CommandList extends CommandBase {
// set default status to STARTED
status = status == null ? InlongGroupStatus.STARTED.toString()
: status;
- List<Integer> statusList = statusList =
InlongGroupStatus.parseStatusCodeByStr(status);
+ List<Integer> statusList =
InlongGroupStatus.parseStatusCodeByStr(status);
pageRequest.setStatusList(statusList);
- InnerInlongManagerClient client = new
InnerInlongManagerClient(connect().getConfiguration());
- Response<PageInfo<InlongGroupListResponse>> pageInfoResponse =
client.listGroups(pageRequest);
+ Response<PageInfo<InlongGroupListResponse>> pageInfoResponse =
managerClient.listGroups(pageRequest);
List<InlongGroupListResponse> groupList =
pageInfoResponse.getData().getList();
PrintUtils.print(groupList, GroupInfo.class);
@@ -120,7 +147,9 @@ public class CommandList extends CommandBase {
}
@Parameters(commandDescription = "Get sink details")
- private static class ListSink extends CommandUtil {
+ private static class ListSink extends AbstractCommandRunner {
+
+ private final InnerInlongManagerClient managerClient;
@Parameter()
private List<String> params;
@@ -131,9 +160,12 @@ public class CommandList extends CommandBase {
@Parameter(names = {"-g", "--group"}, required = true, description =
"group id")
private String group;
+ ListSink(InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ }
+
@Override
void run() {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(connect().getConfiguration());
try {
List<SinkListResponse> sinkListResponses =
managerClient.listSinks(group, stream);
PrintUtils.print(sinkListResponses, SinkInfo.class);
@@ -144,7 +176,9 @@ public class CommandList extends CommandBase {
}
@Parameters(commandDescription = "Get source details")
- private static class ListSource extends CommandUtil {
+ private static class ListSource extends AbstractCommandRunner {
+
+ private final InnerInlongManagerClient managerClient;
@Parameter()
private List<String> params;
@@ -158,9 +192,12 @@ public class CommandList extends CommandBase {
@Parameter(names = {"-t", "--type"}, description = "sink type")
private String type;
+ ListSource(InnerInlongManagerClient managerClient) {
+ this.managerClient = managerClient;
+ }
+
@Override
void run() {
- InnerInlongManagerClient managerClient = new
InnerInlongManagerClient(connect().getConfiguration());
try {
List<SourceListResponse> sourceListResponses =
managerClient.listSources(group, stream, type);
PrintUtils.print(sourceListResponses, SourceInfo.class);
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/CreateGroupConf.java
similarity index 96%
rename from
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
rename to
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/CreateGroupConf.java
index 3de07633d..b8cd8515c 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateGroupConf.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/CreateGroupConf.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.cli;
+package org.apache.inlong.manager.client.cli.pojo;
import lombok.Data;
import org.apache.inlong.manager.client.api.InlongStreamConf;
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/CharsetAdapter.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/CharsetAdapter.java
index 3eaa15336..71a2cace4 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/CharsetAdapter.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/CharsetAdapter.java
@@ -28,11 +28,10 @@ import java.nio.charset.Charset;
/**
* Charset adapter.
*/
-public class CharsetAdapter implements JsonDeserializer {
+public class CharsetAdapter implements JsonDeserializer<Charset> {
@Override
- public Charset deserialize(JsonElement jsonElement, Type type,
- JsonDeserializationContext jsonDeserializationContext)
+ public Charset deserialize(JsonElement jsonElement, Type type,
JsonDeserializationContext context)
throws JsonParseException {
return Charset.forName(jsonElement.getAsString());
}
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandUtil.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
similarity index 79%
rename from
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandUtil.java
rename to
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
index c29c601f7..289107777 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CommandUtil.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/ClientUtils.java
@@ -15,13 +15,11 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.cli;
+package org.apache.inlong.manager.client.cli.util;
-import com.google.gson.Gson;
import org.apache.inlong.manager.client.api.ClientConfiguration;
-import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.client.api.impl.InlongClientImpl;
-import org.apache.inlong.manager.client.cli.util.GsonUtil;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import java.io.BufferedInputStream;
import java.io.File;
@@ -35,21 +33,21 @@ import java.nio.file.Paths;
import java.util.Properties;
/**
- * Util of command for creat connect by config file.
+ * The utils for manager client.
*/
-abstract class CommandUtil {
+public class ClientUtils {
private static final String CONFIG_FILE = "application.properties";
- public InlongClientImpl connect() {
+ /**
+ * Get an inlong client instance.
+ */
+ public static InlongClientImpl getClient() throws IOException {
Properties properties = new Properties();
String path =
Thread.currentThread().getContextClassLoader().getResource("").getPath() +
CONFIG_FILE;
- try {
- InputStream inputStream = new
BufferedInputStream(Files.newInputStream(Paths.get(path)));
- properties.load(inputStream);
- } catch (IOException e) {
- e.printStackTrace();
- }
+ InputStream inputStream = new
BufferedInputStream(Files.newInputStream(Paths.get(path)));
+ properties.load(inputStream);
+
String serviceUrl = properties.getProperty("server.host") + ":" +
properties.getProperty("server.port");
String user = properties.getProperty("default.admin.user");
String password = properties.getProperty("default.admin.password");
@@ -60,7 +58,10 @@ abstract class CommandUtil {
return new InlongClientImpl(serviceUrl, configuration);
}
- String readFile(File file) {
+ /**
+ * Get the file content.
+ */
+ public static String readFile(File file) {
if (!file.exists()) {
System.out.println("File does not exist.");
} else {
@@ -83,10 +84,4 @@ abstract class CommandUtil {
return null;
}
- CreateGroupConf jsonToObject(String string) {
- Gson gson = GsonUtil.gsonBuilder();
- return gson.fromJson(string, CreateGroupConf.class);
- }
-
- abstract void run() throws Exception;
}
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/GsonUtil.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/GsonUtils.java
similarity index 65%
rename from
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/GsonUtil.java
rename to
inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/GsonUtils.java
index c388f80e5..e89b53ecf 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/GsonUtil.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/GsonUtils.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.client.cli.util;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
-import org.apache.inlong.manager.client.api.DataSeparator;
import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sort.BaseSortConf;
import org.apache.inlong.manager.common.pojo.stream.StreamSink;
@@ -31,20 +31,19 @@ import java.nio.charset.Charset;
/**
* Util of gson for register each type of adapter, such as charsetAdapter,
streamSourceAdapter, etc.
*/
-public class GsonUtil {
+public class GsonUtils {
/**
* Init gson instance with register type adapter.
*/
- public static Gson gsonBuilder() {
- return new GsonBuilder()
- .registerTypeAdapter(InlongGroupInfo.class, new
InlongGroupInfoAdapter())
- .registerTypeAdapter(BaseSortConf.class, new
SortBaseConfAdapter())
- .registerTypeAdapter(Charset.class, new CharsetAdapter())
- .registerTypeAdapter(DataSeparator.class, new
SeparatorAdapter())
- .registerTypeAdapter(DataFormat.class, new DataFormatAdapter())
- .registerTypeAdapter(StreamSource.class, new
StreamSourceAdapter())
- .registerTypeAdapter(StreamSink.class, new StreamSinkAdapter())
- .create();
- }
+ public static final Gson GSON = new GsonBuilder()
+ .registerTypeAdapter(InlongGroupInfo.class, new
InlongGroupInfoAdapter())
+ .registerTypeAdapter(BaseSortConf.class, new SortBaseConfAdapter())
+ .registerTypeAdapter(Charset.class, new CharsetAdapter())
+ .registerTypeAdapter(DataSeparator.class, new SeparatorAdapter())
+ .registerTypeAdapter(DataFormat.class, new DataFormatAdapter())
+ .registerTypeAdapter(StreamSource.class, new StreamSourceAdapter())
+ .registerTypeAdapter(StreamSink.class, new StreamSinkAdapter())
+ .create();
+
}
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/InlongGroupInfoAdapter.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/InlongGroupInfoAdapter.java
index 41a4d9ec0..19b626899 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/InlongGroupInfoAdapter.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/InlongGroupInfoAdapter.java
@@ -34,7 +34,7 @@ import java.lang.reflect.Type;
/**
* Inlong group info adapter for JSON deserialize.
*/
-public class InlongGroupInfoAdapter implements JsonDeserializer {
+public class InlongGroupInfoAdapter implements
JsonDeserializer<InlongGroupInfo> {
@Override
public InlongGroupInfo deserialize(JsonElement jsonElement, Type type,
JsonDeserializationContext context)
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/SeparatorAdapter.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/SeparatorAdapter.java
index 6fe712cbb..2f0948a08 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/SeparatorAdapter.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/SeparatorAdapter.java
@@ -21,14 +21,14 @@ import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
-import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import java.lang.reflect.Type;
/**
* Separator adapter.
*/
-public class SeparatorAdapter implements JsonDeserializer {
+public class SeparatorAdapter implements JsonDeserializer<DataSeparator> {
@Override
public DataSeparator deserialize(JsonElement jsonElement, Type type,
JsonDeserializationContext context)
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/SortBaseConfAdapter.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/SortBaseConfAdapter.java
index 5f407f624..d3353eaa1 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/SortBaseConfAdapter.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/SortBaseConfAdapter.java
@@ -23,23 +23,23 @@ import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
-import org.apache.inlong.manager.common.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.common.pojo.sort.BaseSortConf;
+import org.apache.inlong.manager.common.pojo.sort.FlinkSortConf;
import org.apache.inlong.manager.common.pojo.sort.UserDefinedSortConf;
import java.lang.reflect.Type;
/**
- * Sort base config adapter.
+ * Sort base config adapter for JSON deserialize.
*/
-public class SortBaseConfAdapter implements JsonDeserializer {
+public class SortBaseConfAdapter implements JsonDeserializer<BaseSortConf> {
@Override
public BaseSortConf deserialize(JsonElement jsonElement, Type type,
JsonDeserializationContext context)
throws JsonParseException {
JsonObject jsonObject = jsonElement.getAsJsonObject();
String sortType = jsonObject.get("type").getAsString();
- Gson gson = GsonUtil.gsonBuilder();
+ Gson gson = GsonUtils.GSON;
try {
switch (sortType) {
case "FLINK":
@@ -47,9 +47,9 @@ public class SortBaseConfAdapter implements JsonDeserializer {
case "USER_DEFINED":
return gson.fromJson(jsonElement, (Type)
Class.forName((UserDefinedSortConf.class).getName()));
default:
- throw new
ClassNotFoundException(String.format("Unsupported sort type=%s for Inlong",
sortType));
+ throw new
IllegalArgumentException(String.format("Unsupported sort type=%s for Inlong",
sortType));
}
- } catch (ClassNotFoundException e) {
+ } catch (Exception e) {
throw new JsonParseException(e);
}
}
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/StreamSinkAdapter.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/StreamSinkAdapter.java
index f1351051f..477ef6b1d 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/StreamSinkAdapter.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/StreamSinkAdapter.java
@@ -23,26 +23,25 @@ import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
-import org.apache.inlong.manager.common.pojo.stream.StreamSink;
import org.apache.inlong.manager.client.api.sink.ClickHouseSink;
import org.apache.inlong.manager.client.api.sink.HiveSink;
import org.apache.inlong.manager.client.api.sink.KafkaSink;
import org.apache.inlong.manager.common.enums.SinkType;
+import org.apache.inlong.manager.common.pojo.stream.StreamSink;
import java.lang.reflect.Type;
/**
* Stream sink adapter.
*/
-public class StreamSinkAdapter implements JsonDeserializer {
+public class StreamSinkAdapter implements JsonDeserializer<StreamSink> {
@Override
- public StreamSink deserialize(JsonElement jsonElement, Type type,
- JsonDeserializationContext jsonDeserializationContext)
+ public StreamSink deserialize(JsonElement jsonElement, Type type,
JsonDeserializationContext context)
throws JsonParseException {
JsonObject jsonObject = jsonElement.getAsJsonObject();
String sinkType = jsonObject.get("sinkType").getAsString();
- Gson gson = GsonUtil.gsonBuilder();
+ Gson gson = GsonUtils.GSON;
try {
switch (sinkType) {
case SinkType.SINK_HIVE:
@@ -52,9 +51,9 @@ public class StreamSinkAdapter implements JsonDeserializer {
case SinkType.SINK_CLICKHOUSE:
return gson.fromJson(jsonElement, (Type)
Class.forName((ClickHouseSink.class).getName()));
default:
- throw new
ClassNotFoundException(String.format("Unsupported sink type=%s for Inlong",
sinkType));
+ throw new
IllegalArgumentException(String.format("Unsupported sink type=%s for Inlong",
sinkType));
}
- } catch (ClassNotFoundException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
return null;
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/StreamSourceAdapter.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/StreamSourceAdapter.java
index bbf7f613d..38387ce2d 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/StreamSourceAdapter.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/StreamSourceAdapter.java
@@ -23,25 +23,25 @@ import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
-import org.apache.inlong.manager.common.pojo.stream.StreamSource;
import org.apache.inlong.manager.client.api.source.AgentFileSource;
import org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
import org.apache.inlong.manager.common.enums.SourceType;
+import org.apache.inlong.manager.common.pojo.stream.StreamSource;
import java.lang.reflect.Type;
/**
* Stream source adapter.
*/
-public class StreamSourceAdapter implements JsonDeserializer {
+public class StreamSourceAdapter implements JsonDeserializer<StreamSource> {
@Override
- public StreamSource deserialize(JsonElement jsonElement, Type type,
- JsonDeserializationContext jsonDeserializationContext) throws
JsonParseException {
+ public StreamSource deserialize(JsonElement jsonElement, Type type,
JsonDeserializationContext context)
+ throws JsonParseException {
JsonObject jsonObject = jsonElement.getAsJsonObject();
String sourceType = jsonObject.get("sourceType").getAsString();
- Gson gson = GsonUtil.gsonBuilder();
+ Gson gson = GsonUtils.GSON;
try {
switch (sourceType) {
case SourceType.SOURCE_KAFKA:
@@ -51,10 +51,10 @@ public class StreamSourceAdapter implements
JsonDeserializer {
case SourceType.SOURCE_FILE:
return gson.fromJson(jsonElement, (Type)
Class.forName((AgentFileSource.class).getName()));
default:
- throw new ClassNotFoundException(
+ throw new IllegalArgumentException(
String.format("Unsupported source type=%s for
Inlong", sourceType));
}
- } catch (ClassNotFoundException e) {
+ } catch (Exception e) {
e.printStackTrace();
}
return null;
diff --git
a/inlong-manager/manager-client-tools/src/test/java/org/apache/inlong/manager/client/cli/TestCommand.java
b/inlong-manager/manager-client-tools/src/test/java/org/apache/inlong/manager/client/cli/TestCommand.java
index 83129af93..0de512235 100644
---
a/inlong-manager/manager-client-tools/src/test/java/org/apache/inlong/manager/client/cli/TestCommand.java
+++
b/inlong-manager/manager-client-tools/src/test/java/org/apache/inlong/manager/client/cli/TestCommand.java
@@ -30,12 +30,17 @@ public class TestCommand {
CommandToolMain inlongAdminTool = new CommandToolMain();
@Test
+ public void blankTest() {
+ log.info("client tools cannot run the unit tests, as the
application.properties not exist");
+ }
+
+ // @Test
public void testListGroup() {
String[] arg = {"list", "group"};
Assert.assertTrue(inlongAdminTool.run(arg));
}
- @Test
+ // @Test
public void testDescribeGroup() {
String[] arg = {"describe", "group", "-g", "test", "-s", "130"};
Assert.assertTrue(inlongAdminTool.run(arg));
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
index 5cc0bd3bc..d98648db8 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupContext.java
@@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
-import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtils;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -119,7 +119,7 @@ public class InlongGroupContext implements Serializable {
this.status = InlongGroupStatus.FAILED;
for (StreamSource failedSource : failedSources) {
this.groupErrLogs.computeIfAbsent("failedSources",
Lists::newArrayList)
- .add(GsonUtil.toJson(failedSource));
+ .add(GsonUtils.toJson(failedSource));
}
return;
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
index e5c63bd08..e9a98f41f 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongStreamConf.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.pojo.stream.StreamField;
import java.nio.charset.Charset;
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
index 565ac0745..94f1390dd 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/DefaultInlongStreamBuilder.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
import org.apache.inlong.manager.client.api.inner.InnerStreamContext;
-import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtils;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
@@ -120,7 +120,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
public InlongStream init() {
InlongStreamInfo streamInfo = streamContext.getStreamInfo();
StreamPipeline streamPipeline = inlongStream.createPipeline();
- streamInfo.setExtParams(GsonUtil.toJson(streamPipeline));
+ streamInfo.setExtParams(GsonUtils.toJson(streamPipeline));
Double streamIndex = managerClient.createStreamInfo(streamInfo);
streamInfo.setId(streamIndex.intValue());
//Create source and update index
@@ -145,7 +145,7 @@ public class DefaultInlongStreamBuilder extends
InlongStreamBuilder {
public InlongStream initOrUpdate() {
InlongStreamInfo dataStreamInfo = streamContext.getStreamInfo();
StreamPipeline streamPipeline = inlongStream.createPipeline();
- dataStreamInfo.setExtParams(GsonUtil.toJson(streamPipeline));
+ dataStreamInfo.setExtParams(GsonUtils.toJson(streamPipeline));
Boolean isExist = managerClient.isStreamExists(dataStreamInfo);
if (isExist) {
Pair<Boolean, String> updateMsg =
managerClient.updateStreamInfo(dataStreamInfo);
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 8ba3133ec..61474236f 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -30,7 +30,7 @@ import
org.apache.inlong.manager.client.api.InlongStreamBuilder;
import org.apache.inlong.manager.client.api.InlongStreamConf;
import org.apache.inlong.manager.client.api.inner.InnerGroupContext;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
-import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtils;
import org.apache.inlong.manager.client.api.util.InlongGroupTransfer;
import org.apache.inlong.manager.client.api.util.InlongParser;
import org.apache.inlong.manager.common.enums.GroupStatus;
@@ -100,7 +100,7 @@ public class InlongGroupImpl implements InlongGroup {
AssertUtils.isTrue(ProcessStatus.PROCESSING == processView.getStatus(),
String.format("Process status : %s is not corrected, should be
PROCESSING",
processView.getStatus()));
- String formData = GsonUtil.toJson(processView.getFormData());
+ String formData = GsonUtils.toJson(processView.getFormData());
Pair<InlongGroupApproveRequest, List<InlongStreamApproveRequest>>
initMsg = InlongParser
.parseGroupForm(formData);
groupContext.setInitMsg(initMsg);
@@ -267,7 +267,7 @@ public class InlongGroupImpl implements InlongGroup {
logList.stream().filter(x ->
StringUtils.isNotEmpty(x.getComponentName()))
.forEach(streamLog -> {
String componentName =
streamLog.getComponentName();
- String log = GsonUtil.toJson(streamLog);
+ String log = GsonUtils.toJson(streamLog);
streamLogs.computeIfAbsent(componentName,
Lists::newArrayList).add(log);
});
inlongGroupContext.getStreamErrLogs().put(streamId,
streamLogs);
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
index ee0c98669..2b11902a4 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongStreamImpl.java
@@ -27,8 +27,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.client.api.inner.InnerInlongManagerClient;
-import org.apache.inlong.manager.common.util.AssertUtils;
-import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtils;
import org.apache.inlong.manager.client.api.util.InlongStreamSinkTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamSourceTransfer;
import org.apache.inlong.manager.client.api.util.InlongStreamTransfer;
@@ -51,6 +50,7 @@ import
org.apache.inlong.manager.common.pojo.stream.StreamSource;
import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.TransformRequest;
import org.apache.inlong.manager.common.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.common.util.AssertUtils;
import java.util.List;
import java.util.Map;
@@ -80,8 +80,6 @@ public class InlongStreamImpl extends InlongStream {
/**
* Constructor of InlongStreamImpl.
- * @param fullStreamResponse
- * @param managerClient
*/
public InlongStreamImpl(FullStreamResponse fullStreamResponse,
InnerInlongManagerClient managerClient) {
InlongStreamInfo streamInfo = fullStreamResponse.getStreamInfo();
@@ -310,7 +308,7 @@ public class InlongStreamImpl extends InlongStream {
}
streamInfo.setFieldList(InlongStreamTransfer.createStreamFields(this.streamFields,
streamInfo));
StreamPipeline streamPipeline = createPipeline();
- streamInfo.setExtParams(GsonUtil.toJson(streamPipeline));
+ streamInfo.setExtParams(GsonUtils.toJson(streamPipeline));
Pair<Boolean, String> updateMsg =
managerClient.updateStreamInfo(streamInfo);
if (!updateMsg.getKey()) {
throw new RuntimeException(String.format("Update data stream
failed:%s", updateMsg.getValue()));
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
index 915d58d08..4515d0289 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerInlongManagerClient.java
@@ -32,7 +32,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.client.api.ClientConfiguration;
import
org.apache.inlong.manager.client.api.InlongGroupContext.InlongGroupStatus;
-import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtils;
import org.apache.inlong.manager.client.api.util.InlongParser;
import org.apache.inlong.manager.common.auth.Authentication;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
@@ -221,7 +221,7 @@ public class InnerInlongManagerClient {
* @return Response encapsulate of inlong group list
*/
public Response<PageInfo<InlongGroupListResponse>>
listGroups(InlongGroupPageRequest pageRequest) throws Exception {
- String requestParams = GsonUtil.toJson(pageRequest);
+ String requestParams = GsonUtils.toJson(pageRequest);
RequestBody requestBody =
RequestBody.create(MediaType.parse("application/json"), requestParams);
String path = HTTP_PATH + "/group/list";
final String url = formatUrl(path);
@@ -245,7 +245,7 @@ public class InnerInlongManagerClient {
*/
public String createGroup(InlongGroupRequest groupInfo) {
String path = HTTP_PATH + "/group/save";
- final String biz = GsonUtil.toJson(groupInfo);
+ final String biz = GsonUtils.toJson(groupInfo);
final RequestBody bizBody =
RequestBody.create(MediaType.parse("application/json"), biz);
final String url = formatUrl(path);
Request request = new Request.Builder()
@@ -274,7 +274,7 @@ public class InnerInlongManagerClient {
*/
public Pair<String, String> updateGroup(InlongGroupRequest groupRequest) {
String path = HTTP_PATH + "/group/update";
- final String group = GsonUtil.toJson(groupRequest);
+ final String group = GsonUtils.toJson(groupRequest);
final RequestBody groupBody =
RequestBody.create(MediaType.parse("application/json"), group);
final String url = formatUrl(path);
Request request = new Request.Builder()
@@ -299,7 +299,7 @@ public class InnerInlongManagerClient {
*/
public Double createStreamInfo(InlongStreamInfo streamInfo) {
String path = HTTP_PATH + "/stream/save";
- final String stream = GsonUtil.toJson(streamInfo);
+ final String stream = GsonUtils.toJson(streamInfo);
final RequestBody streamBody =
RequestBody.create(MediaType.parse("application/json"), stream);
final String url = formatUrl(path);
Request request = new Request.Builder()
@@ -354,7 +354,7 @@ public class InnerInlongManagerClient {
streamInfo.setModifyTime(null);
final String path = HTTP_PATH + "/stream/update";
final String url = formatUrl(path);
- final String stream = GsonUtil.toJson(streamInfo);
+ final String stream = GsonUtils.toJson(streamInfo);
RequestBody bizBody =
RequestBody.create(MediaType.parse("application/json"), stream);
Request request = new Request.Builder()
.post(bizBody)
@@ -414,7 +414,7 @@ public class InnerInlongManagerClient {
public List<FullStreamResponse> listStreamInfo(String inlongGroupId) {
InlongStreamPageRequest pageRequest = new InlongStreamPageRequest();
pageRequest.setInlongGroupId(inlongGroupId);
- String requestParams = GsonUtil.toJson(pageRequest);
+ String requestParams = GsonUtils.toJson(pageRequest);
RequestBody requestBody =
RequestBody.create(MediaType.parse("application/json"), requestParams);
final String path = HTTP_PATH + "/stream/listAll";
final String url = formatUrl(path);
@@ -442,7 +442,7 @@ public class InnerInlongManagerClient {
*/
public Double createSource(SourceRequest sourceRequest) {
String path = HTTP_PATH + "/source/save";
- final String source = GsonUtil.toJson(sourceRequest);
+ final String source = GsonUtils.toJson(sourceRequest);
final RequestBody sourceBody =
RequestBody.create(MediaType.parse("application/json"), source);
final String url = formatUrl(path);
Request request = new Request.Builder()
@@ -506,7 +506,7 @@ public class InnerInlongManagerClient {
public Pair<Boolean, String> updateSource(SourceRequest sourceRequest) {
final String path = HTTP_PATH + "/source/update";
final String url = formatUrl(path);
- final String storage = GsonUtil.toJson(sourceRequest);
+ final String storage = GsonUtils.toJson(sourceRequest);
final RequestBody storageBody =
RequestBody.create(MediaType.parse("application/json"), storage);
Request request = new Request.Builder()
.post(storageBody)
@@ -561,7 +561,7 @@ public class InnerInlongManagerClient {
*/
public Double createTransform(TransformRequest transformRequest) {
String path = HTTP_PATH + "/transform/save";
- final String sink = GsonUtil.toJson(transformRequest);
+ final String sink = GsonUtils.toJson(transformRequest);
final RequestBody transformBody =
RequestBody.create(MediaType.parse("application/json"), sink);
final String url = formatUrl(path);
Request request = new Request.Builder()
@@ -614,7 +614,7 @@ public class InnerInlongManagerClient {
public Pair<Boolean, String> updateTransform(TransformRequest
transformRequest) {
final String path = HTTP_PATH + "/transform/update";
final String url = formatUrl(path);
- final String transform = GsonUtil.toJson(transformRequest);
+ final String transform = GsonUtils.toJson(transformRequest);
final RequestBody storageBody =
RequestBody.create(MediaType.parse("application/json"), transform);
Request request = new Request.Builder()
.method("POST", storageBody)
@@ -675,7 +675,7 @@ public class InnerInlongManagerClient {
*/
public Double createSink(SinkRequest sinkRequest) {
String path = HTTP_PATH + "/sink/save";
- final String sink = GsonUtil.toJson(sinkRequest);
+ final String sink = GsonUtils.toJson(sinkRequest);
final RequestBody sinkBody =
RequestBody.create(MediaType.parse("application/json"), sink);
final String url = formatUrl(path);
Request request = new Request.Builder()
@@ -767,7 +767,7 @@ public class InnerInlongManagerClient {
public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
final String path = HTTP_PATH + "/sink/update";
final String url = formatUrl(path);
- final String storage = GsonUtil.toJson(sinkRequest);
+ final String storage = GsonUtils.toJson(sinkRequest);
final RequestBody storageBody =
RequestBody.create(MediaType.parse("application/json"), storage);
Request request = new Request.Builder()
.post(storageBody)
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
index 10017ccd4..33514347b 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/sink/HiveSink.java
@@ -23,7 +23,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.common.auth.DefaultAuthentication;
import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.FileFormat;
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtils.java
similarity index 68%
rename from
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
rename to
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtils.java
index 771bf3c00..137a5793c 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtil.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/GsonUtils.java
@@ -27,25 +27,22 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSyntaxException;
import lombok.SneakyThrows;
-import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.Date;
-import java.util.Iterator;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-@Slf4j
-public class GsonUtil {
+public class GsonUtils {
- private static Gson gson;
- private static Gson gsonWithNull;
+ private static final Gson GSON;
+ private static final Gson GSON_WITH_NULL;
- private static JsonDeserializer<Date> dataJsonDeserializer = new
JsonDeserializer<Date>() {
+ private static final JsonDeserializer<Date> DATE_JSON_DESERIALIZER = new
JsonDeserializer<Date>() {
- private Pattern pattern = Pattern.compile("[0-9]+.?[0-9E]+");
+ private final Pattern pattern = Pattern.compile("[0-9]+.?[0-9E]+");
@SneakyThrows
@Override
@@ -65,34 +62,28 @@ public class GsonUtil {
static {
final GsonBuilder builder = new GsonBuilder();
- builder.registerTypeAdapter(Date.class, dataJsonDeserializer);
- gson = builder.create();
+ builder.registerTypeAdapter(Date.class, DATE_JSON_DESERIALIZER);
+ GSON = builder.create();
final GsonBuilder builderWithNull = new GsonBuilder().serializeNulls();
- builder.registerTypeAdapter(Date.class, dataJsonDeserializer);
- gsonWithNull = builderWithNull.create();
+ builder.registerTypeAdapter(Date.class, DATE_JSON_DESERIALIZER);
+ GSON_WITH_NULL = builderWithNull.create();
}
- private GsonUtil() {
+ private GsonUtils() {
}
- public static Gson getGson() {
- return gson;
- }
-
- public static Gson getGsonWithNull() {
- return gsonWithNull;
- }
-
- public static void jsonObjectToMap(Map<String, String> parameterMap,
JsonObject jsonObject) {
- String key = null;
- String value = null;
- Iterator<String> iterator = jsonObject.keySet().iterator();
- while (iterator.hasNext()) {
- key = iterator.next();
+ /**
+ * Transfer the Json object to map
+ */
+ public static void jsonObjectToMap(JsonObject jsonObject, Map<String,
String> parameterMap) {
+ String key;
+ String value;
+ for (String s : jsonObject.keySet()) {
+ key = s;
JsonElement jsonElement = jsonObject.get(key);
if (jsonElement instanceof JsonObject || jsonElement instanceof
JsonArray) {
- value = getGson().toJson(jsonElement);
+ value = GSON.toJson(jsonElement);
} else {
value = jsonElement.getAsString();
}
@@ -100,21 +91,24 @@ public class GsonUtil {
}
}
+ /**
+ * Get JsonObject from the given object.
+ */
public static JsonObject getJsonObjectFromObject(Object object) {
- JsonElement element = getGson().fromJson(getGson().toJson(object),
JsonElement.class);
+ JsonElement element = GSON.fromJson(GSON.toJson(object),
JsonElement.class);
return element.getAsJsonObject();
}
public static String toJson(Object src) {
- return gson.toJson(src);
+ return GSON.toJson(src);
}
public static <T> T fromJson(String json, Type typeOfT) throws
JsonSyntaxException {
- return gson.fromJson(json, typeOfT);
+ return GSON.fromJson(json, typeOfT);
}
public static String toJsonHasNull(Object src) {
- return gsonWithNull.toJson(src);
+ return GSON_WITH_NULL.toJson(src);
}
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 0b8eba701..5efff13d0 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -83,7 +83,7 @@ public class InlongParser {
* Parse body to get the response.
*/
public static Response parseResponse(String responseBody) {
- return GsonUtil.fromJson(responseBody, Response.class);
+ return GsonUtils.fromJson(responseBody, Response.class);
}
/**
@@ -91,7 +91,7 @@ public class InlongParser {
*/
public static <T> Response<T> parseResponse(Class<T> responseType, String
responseBody) {
AssertUtils.notNull(responseType, "responseType must not be null");
- return GsonUtil.fromJson(
+ return GsonUtils.fromJson(
responseBody,
com.google.gson.reflect.TypeToken.getParameterized(Response.class,
responseType).getType()
);
@@ -101,16 +101,16 @@ public class InlongParser {
* Parse response to get the inlong group info.
*/
public static InlongGroupInfo parseGroupInfo(Response response) {
- String dataJson = GsonUtil.toJson(response.getData());
- InlongGroupInfo groupInfo = GsonUtil.fromJson(dataJson,
InlongGroupInfo.class);
+ String dataJson = GsonUtils.toJson(response.getData());
+ InlongGroupInfo groupInfo = GsonUtils.fromJson(dataJson,
InlongGroupInfo.class);
MQType mqType = MQType.forType(groupInfo.getMqType());
if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- return GsonUtil.<InlongPulsarInfo>fromJson(dataJson,
InlongPulsarInfo.class);
+ return GsonUtils.<InlongPulsarInfo>fromJson(dataJson,
InlongPulsarInfo.class);
} else if (mqType == MQType.TUBE) {
- return GsonUtil.<InlongTubeInfo>fromJson(dataJson,
InlongTubeInfo.class);
+ return GsonUtils.<InlongTubeInfo>fromJson(dataJson,
InlongTubeInfo.class);
} else {
- return GsonUtil.<InlongTubeInfo>fromJson(dataJson,
InlongNoneMqInfo.class);
+ return GsonUtils.<InlongTubeInfo>fromJson(dataJson,
InlongNoneMqInfo.class);
}
}
@@ -119,15 +119,15 @@ public class InlongParser {
*/
public static PageInfo<InlongGroupListResponse> parseGroupList(Response
response) {
Object data = response.getData();
- String pageInfoJson = GsonUtil.toJson(data);
- return GsonUtil.fromJson(pageInfoJson,
+ String pageInfoJson = GsonUtils.toJson(data);
+ return GsonUtils.fromJson(pageInfoJson,
new TypeToken<PageInfo<InlongGroupListResponse>>() {
}.getType());
}
public static InlongStreamInfo parseStreamInfo(Response response) {
Object data = response.getData();
- return GsonUtil.fromJson(GsonUtil.toJson(data),
InlongStreamInfo.class);
+ return GsonUtils.fromJson(GsonUtils.toJson(data),
InlongStreamInfo.class);
}
/**
@@ -135,12 +135,12 @@ public class InlongParser {
*/
public static List<FullStreamResponse> parseStreamList(Response response) {
Object data = response.getData();
- JsonObject pageInfoJson = GsonUtil.fromJson(GsonUtil.toJson(data),
JsonObject.class);
+ JsonObject pageInfoJson = GsonUtils.fromJson(GsonUtils.toJson(data),
JsonObject.class);
JsonArray fullStreamArray = pageInfoJson.getAsJsonArray("list");
List<FullStreamResponse> list = Lists.newArrayList();
for (int streamIndex = 0; streamIndex < fullStreamArray.size();
streamIndex++) {
JsonObject fullStreamJson = (JsonObject)
fullStreamArray.get(streamIndex);
- FullStreamResponse fullStreamResponse =
GsonUtil.fromJson(fullStreamJson.toString(),
+ FullStreamResponse fullStreamResponse =
GsonUtils.fromJson(fullStreamJson.toString(),
FullStreamResponse.class);
list.add(fullStreamResponse);
//Parse sourceResponse in each stream
@@ -153,27 +153,27 @@ public class InlongParser {
SourceType sourceType = SourceType.forType(type);
switch (sourceType) {
case BINLOG:
- BinlogSourceResponse binlogSourceResponse =
GsonUtil.fromJson(sourceJson.toString(),
+ BinlogSourceResponse binlogSourceResponse =
GsonUtils.fromJson(sourceJson.toString(),
BinlogSourceResponse.class);
sourceResponses.add(binlogSourceResponse);
break;
case KAFKA:
- KafkaSourceResponse kafkaSourceResponse =
GsonUtil.fromJson(sourceJson.toString(),
+ KafkaSourceResponse kafkaSourceResponse =
GsonUtils.fromJson(sourceJson.toString(),
KafkaSourceResponse.class);
sourceResponses.add(kafkaSourceResponse);
break;
case FILE:
- FileSourceResponse fileSourceResponse =
GsonUtil.fromJson(sourceJson.toString(),
+ FileSourceResponse fileSourceResponse =
GsonUtils.fromJson(sourceJson.toString(),
FileSourceResponse.class);
sourceResponses.add(fileSourceResponse);
break;
case AUTO_PUSH:
- AutoPushSourceResponse autoPushSourceResponse =
GsonUtil.fromJson(sourceJson.toString(),
+ AutoPushSourceResponse autoPushSourceResponse =
GsonUtils.fromJson(sourceJson.toString(),
AutoPushSourceRequest.class);
sourceResponses.add(autoPushSourceResponse);
break;
case POSTGRES:
- PostgresSourceResponse postgresSourceResponse =
GsonUtil.fromJson(sourceJson.toString(),
+ PostgresSourceResponse postgresSourceResponse =
GsonUtils.fromJson(sourceJson.toString(),
PostgresSourceResponse.class);
sourceResponses.add(postgresSourceResponse);
break;
@@ -192,27 +192,27 @@ public class InlongParser {
SinkType sinkType = SinkType.forType(type);
switch (sinkType) {
case HIVE:
- HiveSinkResponse hiveSinkResponse =
GsonUtil.fromJson(sinkJson.toString(),
+ HiveSinkResponse hiveSinkResponse =
GsonUtils.fromJson(sinkJson.toString(),
HiveSinkResponse.class);
sinkResponses.add(hiveSinkResponse);
break;
case KAFKA:
- KafkaSinkResponse kafkaSinkResponse =
GsonUtil.fromJson(sinkJson.toString(),
+ KafkaSinkResponse kafkaSinkResponse =
GsonUtils.fromJson(sinkJson.toString(),
KafkaSinkResponse.class);
sinkResponses.add(kafkaSinkResponse);
break;
case ICEBERG:
- IcebergSinkResponse icebergSinkResponse =
GsonUtil.fromJson(sinkJson.toString(),
+ IcebergSinkResponse icebergSinkResponse =
GsonUtils.fromJson(sinkJson.toString(),
IcebergSinkResponse.class);
sinkResponses.add(icebergSinkResponse);
break;
case CLICKHOUSE:
- ClickHouseSinkResponse clickHouseSinkResponse =
GsonUtil.fromJson(sinkJson.toString(),
+ ClickHouseSinkResponse clickHouseSinkResponse =
GsonUtils.fromJson(sinkJson.toString(),
ClickHouseSinkResponse.class);
sinkResponses.add(clickHouseSinkResponse);
break;
case POSTGRES:
- PostgresSinkResponse postgresSinkResponse =
GsonUtil.fromJson(sinkJson.toString(),
+ PostgresSinkResponse postgresSinkResponse =
GsonUtils.fromJson(sinkJson.toString(),
PostgresSinkResponse.class);
sinkResponses.add(postgresSinkResponse);
break;
@@ -229,8 +229,8 @@ public class InlongParser {
*/
public static PageInfo<SourceListResponse> parseSourceList(Response
response) {
Object data = response.getData();
- String pageInfoJson = GsonUtil.toJson(data);
- PageInfo<SourceListResponse> pageInfo = GsonUtil.fromJson(pageInfoJson,
+ String pageInfoJson = GsonUtils.toJson(data);
+ PageInfo<SourceListResponse> pageInfo =
GsonUtils.fromJson(pageInfoJson,
new TypeToken<PageInfo<SourceListResponse>>() {
}.getType());
if (pageInfo.getList() != null && !pageInfo.getList().isEmpty()) {
@@ -238,23 +238,23 @@ public class InlongParser {
SourceType sourceType =
SourceType.forType(sourceListResponse.getSourceType());
switch (sourceType) {
case BINLOG:
- return GsonUtil.fromJson(pageInfoJson,
+ return GsonUtils.fromJson(pageInfoJson,
new
TypeToken<PageInfo<BinlogSourceListResponse>>() {
}.getType());
case KAFKA:
- return GsonUtil.fromJson(pageInfoJson,
+ return GsonUtils.fromJson(pageInfoJson,
new TypeToken<PageInfo<KafkaSourceListResponse>>()
{
}.getType());
case FILE:
- return GsonUtil.fromJson(pageInfoJson,
+ return GsonUtils.fromJson(pageInfoJson,
new TypeToken<PageInfo<FileSourceListResponse>>() {
}.getType());
case AUTO_PUSH:
- return GsonUtil.fromJson(pageInfoJson,
+ return GsonUtils.fromJson(pageInfoJson,
new
TypeToken<PageInfo<AutoPushSourceListResponse>>() {
}.getType());
case POSTGRES:
- return GsonUtil.fromJson(pageInfoJson,
+ return GsonUtils.fromJson(pageInfoJson,
new
TypeToken<PageInfo<PostgresSourceListResponse>>() {
}.getType());
default:
@@ -271,8 +271,8 @@ public class InlongParser {
*/
public static List<TransformResponse> parseTransformList(Response
response) {
Object data = response.getData();
- String pageInfoJson = GsonUtil.toJson(data);
- return GsonUtil.fromJson(pageInfoJson,
+ String pageInfoJson = GsonUtils.toJson(data);
+ return GsonUtils.fromJson(pageInfoJson,
new TypeToken<List<TransformResponse>>() {
}.getType());
}
@@ -282,8 +282,8 @@ public class InlongParser {
*/
public static PageInfo<SinkListResponse> parseSinkList(Response response) {
Object data = response.getData();
- String pageInfoJson = GsonUtil.toJson(data);
- return GsonUtil.fromJson(pageInfoJson,
+ String pageInfoJson = GsonUtils.toJson(data);
+ return GsonUtils.fromJson(pageInfoJson,
new TypeToken<PageInfo<SinkListResponse>>() {
}.getType());
}
@@ -292,15 +292,15 @@ public class InlongParser {
* Parse forms of group.
*/
public static Pair<InlongGroupApproveRequest,
List<InlongStreamApproveRequest>> parseGroupForm(String formJson) {
- JsonObject formData = GsonUtil.fromJson(formJson, JsonObject.class);
+ JsonObject formData = GsonUtils.fromJson(formJson, JsonObject.class);
JsonObject groupJson = formData.getAsJsonObject(GROUP_INFO);
- InlongGroupApproveRequest groupApproveInfo =
GsonUtil.fromJson(groupJson.toString(),
+ InlongGroupApproveRequest groupApproveInfo =
GsonUtils.fromJson(groupJson.toString(),
InlongGroupApproveRequest.class);
JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
if (mqExtInfo != null && mqExtInfo.get(MQ_TYPE) != null) {
MQType mqType =
MQType.forType(mqExtInfo.get(MQ_TYPE).getAsString());
if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- InlongPulsarDTO pulsarInfo =
GsonUtil.fromJson(mqExtInfo.toString(),
+ InlongPulsarDTO pulsarInfo =
GsonUtils.fromJson(mqExtInfo.toString(),
InlongPulsarDTO.class);
groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
@@ -314,7 +314,7 @@ public class InlongParser {
}
}
JsonArray streamJson = formData.getAsJsonArray("streamInfoList");
- List<InlongStreamApproveRequest> streamApproveList =
GsonUtil.fromJson(streamJson.toString(),
+ List<InlongStreamApproveRequest> streamApproveList =
GsonUtils.fromJson(streamJson.toString(),
new TypeToken<List<InlongStreamApproveRequest>>() {
}.getType());
return Pair.of(groupApproveInfo, streamApproveList);
@@ -325,8 +325,8 @@ public class InlongParser {
*/
public static PageInfo<EventLogView> parseEventLogViewList(Response
response) {
Object data = response.getData();
- String pageInfoJson = GsonUtil.toJson(data);
- return GsonUtil.fromJson(pageInfoJson,
+ String pageInfoJson = GsonUtils.toJson(data);
+ return GsonUtils.fromJson(pageInfoJson,
new TypeToken<PageInfo<EventLogView>>() {
}.getType());
}
@@ -336,8 +336,8 @@ public class InlongParser {
*/
public static PageInfo<InlongStreamConfigLogListResponse>
parseStreamLogList(Response response) {
Object data = response.getData();
- String pageInfoJson = GsonUtil.toJson(data);
- return GsonUtil.fromJson(pageInfoJson,
+ String pageInfoJson = GsonUtils.toJson(data);
+ return GsonUtils.fromJson(pageInfoJson,
new TypeToken<PageInfo<InlongStreamConfigLogListResponse>>() {
}.getType());
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
index 21b543d50..a7e47b9f1 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamSinkTransfer.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.api.util;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.client.api.DataSeparator;
+import org.apache.inlong.manager.common.enums.DataSeparator;
import org.apache.inlong.manager.client.api.sink.ClickHouseSink;
import org.apache.inlong.manager.client.api.sink.HbaseSink;
import org.apache.inlong.manager.client.api.sink.HiveSink;
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java
index 523179b89..05460f3ca 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java
+++
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongStreamTransformTransfer.java
@@ -51,7 +51,7 @@ public class InlongStreamTransformTransfer {
TransformDefinition transformDefinition =
streamTransform.getTransformDefinition();
transformRequest.setTransformType(transformDefinition.getTransformType().getType());
transformRequest.setVersion(1);
-
transformRequest.setTransformDefinition(GsonUtil.toJson(transformDefinition));
+
transformRequest.setTransformDefinition(GsonUtils.toJson(transformDefinition));
if (CollectionUtils.isNotEmpty(streamTransform.getPreNodes())) {
transformRequest.setPreNodeNames(Joiner.on(",").join(streamTransform.getPreNodes()));
}
diff --git
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
index 175f69c99..4b964698f 100644
---
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
+++
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongStreamImplTest.java
@@ -25,7 +25,7 @@ import
org.apache.inlong.manager.client.api.source.KafkaSource;
import org.apache.inlong.manager.client.api.source.MySQLBinlogSource;
import org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
import
org.apache.inlong.manager.client.api.transform.SingleDependencyTransform;
-import org.apache.inlong.manager.client.api.util.GsonUtil;
+import org.apache.inlong.manager.client.api.util.GsonUtils;
import org.apache.inlong.manager.common.pojo.stream.StreamPipeline;
import org.apache.inlong.manager.common.pojo.stream.StreamTransform;
import org.apache.inlong.manager.common.pojo.transform.filter.FilterDefinition;
@@ -82,7 +82,7 @@ public class InlongStreamImplTest {
inlongStream.addTransform(singleDependencyTransform1);
inlongStream.addTransform(singleDependencyTransform2);
StreamPipeline streamPipeline = inlongStream.createPipeline();
- String pipelineView = GsonUtil.toJson(streamPipeline);
+ String pipelineView = GsonUtils.toJson(streamPipeline);
Assert.assertTrue(pipelineView.contains("{\"inputNodes\":[\"C\"],\"outputNodes\":[\"D\",\"G\"]"));
Assert.assertTrue(pipelineView.contains("{\"inputNodes\":[\"D\"],\"outputNodes\":[\"E\",\"F\"]}"));
}
diff --git
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java
similarity index 79%
rename from
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
rename to
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java
index 6b795a8ff..39c27f8d2 100644
---
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/DataSeparator.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/DataSeparator.java
@@ -15,9 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.client.api;
+package org.apache.inlong.manager.common.enums;
+/**
+ * Enum of data separator and related ASCII code.
+ */
public enum DataSeparator {
+
VERTICAL_BAR("|", 124),
COMMA(",", 44),
COLON(":", 58),
@@ -27,21 +31,13 @@ public enum DataSeparator {
STX("\002", 2),
ETX("\003", 3);
- private String seperator;
-
- private int asciiCode;
+ private final String separator;
- public String getSeperator() {
- return this.seperator;
- }
-
- public int getAsciiCode() {
- return this.asciiCode;
- }
+ private final Integer asciiCode;
- DataSeparator(String seperator, int asciiCode) {
+ DataSeparator(String separator, int asciiCode) {
this.asciiCode = asciiCode;
- this.seperator = seperator;
+ this.separator = separator;
}
public static DataSeparator forAscii(int asciiCode) {
@@ -52,4 +48,12 @@ public enum DataSeparator {
}
throw new IllegalArgumentException(String.format("Unsupported ascii
for %s", asciiCode));
}
+
+ public String getSeparator() {
+ return this.separator;
+ }
+
+ public Integer getAsciiCode() {
+ return this.asciiCode;
+ }
}
diff --git
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
index a972f74db..a317d6b69 100644
---
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
+++
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/stream/InlongStreamRequest.java
@@ -20,7 +20,9 @@ package org.apache.inlong.manager.common.pojo.stream;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import org.apache.inlong.manager.common.enums.DataSeparator;
+import java.nio.charset.StandardCharsets;
import java.util.List;
/**
@@ -53,10 +55,10 @@ public class InlongStreamRequest {
private String dataType;
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
- private String dataEncoding;
+ private String dataEncoding = StandardCharsets.UTF_8.toString();
@ApiModelProperty(value = "Data separator, stored as ASCII code")
- private String dataSeparator;
+ private String dataSeparator =
DataSeparator.VERTICAL_BAR.getAsciiCode().toString();
@ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
private String dataEscapeChar;
@@ -64,7 +66,7 @@ public class InlongStreamRequest {
@ApiModelProperty(value = "Whether to send synchronously, 0: no, 1: yes",
notes = "Each task under this stream sends data synchronously, "
+ "which will affect the throughput of data collection,
please choose carefully")
- private Integer syncSend;
+ private Integer syncSend = 0;
@ApiModelProperty(value = "Number of access items per day, unit: 10,000
items per day")
private Integer dailyRecords;