This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b24bea605 [INLONG-7778][Manager] Add default value for JSON file of
the create command (#7780)
b24bea605 is described below
commit b24bea605414836ade48ee569f69f325bebf653a
Author: haifxu <[email protected]>
AuthorDate: Thu Apr 6 15:21:42 2023 +0800
[INLONG-7778][Manager] Add default value for JSON file of the create
command (#7780)
---
.../inlong/manager/client/cli/CreateCommand.java | 81 +++++++++++++++++-----
.../manager/client/cli/consts/GroupConstants.java | 31 +++++++++
.../manager/client/cli/pojo/CreateGroupConf.java | 8 ---
3 files changed, 94 insertions(+), 26 deletions(-)
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 4b4e602ef..02a0aa5e1 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -29,16 +29,25 @@ import
org.apache.inlong.manager.client.api.inner.client.UserClient;
import org.apache.inlong.manager.client.cli.pojo.CreateGroupConf;
import org.apache.inlong.manager.client.cli.util.ClientUtils;
import org.apache.inlong.manager.client.cli.validator.UserTypeValidator;
-import org.apache.inlong.manager.common.enums.SimpleGroupStatus;
+import org.apache.inlong.manager.common.enums.DataFormat;
import org.apache.inlong.manager.common.enums.UserTypeEnum;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterTagRequest;
+import org.apache.inlong.manager.pojo.sort.FlinkSortConf;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.user.UserRequest;
import java.io.File;
import java.util.List;
+import java.util.Optional;
+
+import static
org.apache.inlong.manager.client.cli.consts.GroupConstants.DEFAULT_DATA_ENCODING;
+import static
org.apache.inlong.manager.client.cli.consts.GroupConstants.DEFAULT_DATA_SEPARATOR;
+import static
org.apache.inlong.manager.client.cli.consts.GroupConstants.DEFAULT_IGNORE_PARSE_ERROR;
+import static
org.apache.inlong.manager.client.cli.consts.GroupConstants.DEFAULT_LIGHTWEIGHT;
+import static
org.apache.inlong.manager.common.consts.InlongConstants.ADMIN_USER;
/**
* Create resource by json file.
@@ -67,42 +76,78 @@ public class CreateCommand extends AbstractCommand {
@Parameter(names = {"-f", "--file"}, converter = FileConverter.class,
description = "json file")
private File file;
- @Parameter(names = {"-s"}, description = "optional log string to
create file")
- private String input;
-
@Override
void run() {
try {
- String content;
- if (input != null) {
- content = input;
- } else {
- content = ClientUtils.readFile(file);
- }
+ String content = ClientUtils.readFile(file);
+
// first extract group config from the file passed in
CreateGroupConf groupConf = JsonUtils.parseObject(content,
CreateGroupConf.class);
assert groupConf != null;
+ setDefaultConfigInfo(groupConf);
+
// get the corresponding inlong group, aka the task to execute
InlongClient inlongClient = ClientUtils.getClient();
InlongGroup group =
inlongClient.forGroup(groupConf.getGroupInfo());
InlongStreamBuilder streamBuilder =
group.createStream(groupConf.getStreamInfo());
// put in parameters:source and sink,stream fields, then
initialize
- streamBuilder.fields(groupConf.getStreamFieldList());
- streamBuilder.source(groupConf.getStreamSource());
- streamBuilder.sink(groupConf.getStreamSink());
- streamBuilder.transform(groupConf.getStreamTransform());
+ streamBuilder.fields(groupConf.getStreamInfo().getFieldList());
+
groupConf.getStreamInfo().getSourceList().forEach(streamBuilder::source);
+
groupConf.getStreamInfo().getSinkList().forEach(streamBuilder::sink);
+ if (groupConf.getStreamTransform() != null) {
+ streamBuilder.transform(groupConf.getStreamTransform());
+ }
streamBuilder.initOrUpdate();
// initialize the new stream group
InlongGroupContext context = group.init();
- if (!SimpleGroupStatus.STARTED.equals(context.getStatus())) {
- throw new Exception("Start group failed, current status: "
+ context.getStatus());
- }
- System.out.println("Start group success!");
+ System.out.println("Create group success, current status: " +
context.getStatus());
} catch (Exception e) {
System.out.println("Create group failed!");
System.out.println(e.getMessage());
}
}
+
+ /**
+ * Set default value for group conf
+ *
+ * @param groupConf group conf
+ */
+ private void setDefaultConfigInfo(CreateGroupConf groupConf) {
+ String inlongGroupId = groupConf.getGroupInfo().getInlongGroupId();
+ String inlongStreamId =
groupConf.getStreamInfo().getInlongStreamId();
+ // group
+ groupConf.getGroupInfo().setInCharges(ADMIN_USER);
+ groupConf.getGroupInfo().setLightweight(DEFAULT_LIGHTWEIGHT);
+ groupConf.getGroupInfo().setSortConf(new FlinkSortConf());
+
+ // stream
+ InlongStreamInfo streamInfo = groupConf.getStreamInfo();
+ groupConf.getStreamInfo().setInlongGroupId(inlongGroupId);
+
streamInfo.setDataType(Optional.ofNullable(streamInfo.getDataType()).orElse(DataFormat.CSV.getName()));
+
streamInfo.setDataEncoding(Optional.ofNullable(streamInfo.getDataEncoding()).orElse(DEFAULT_DATA_ENCODING));
+ streamInfo.setDataSeparator(
+
Optional.ofNullable(streamInfo.getDataSeparator()).orElse(DEFAULT_DATA_SEPARATOR));
+ streamInfo.setIgnoreParseError(
+
Optional.ofNullable(streamInfo.getIgnoreParseError()).orElse(DEFAULT_IGNORE_PARSE_ERROR));
+
+ // field
+ streamInfo.getFieldList().forEach(field -> {
+ field.setInlongGroupId(inlongGroupId);
+ field.setInlongStreamId(inlongStreamId);
+ });
+
+ // source
+ streamInfo.getSourceList().forEach(source -> {
+ source.setInlongGroupId(inlongGroupId);
+ source.setInlongStreamId(inlongStreamId);
+ });
+
+ // sink
+ streamInfo.getSinkList().forEach(sink -> {
+ sink.setInlongGroupId(inlongGroupId);
+ sink.setInlongStreamId(inlongStreamId);
+ });
+ }
}
@Parameters(commandDescription = "Create cluster by json file")
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/consts/GroupConstants.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/consts/GroupConstants.java
new file mode 100644
index 000000000..370cbfc4e
--- /dev/null
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/consts/GroupConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.cli.consts;
+
+public class GroupConstants {
+
+ public static final int DEFAULT_LIGHTWEIGHT = 0;
+
+ public static final String DEFAULT_DATA_ENCODING = "UTF-8";
+
+ // int value of '|'
+ public static final String DEFAULT_DATA_SEPARATOR = "124";
+
+ public static final boolean DEFAULT_IGNORE_PARSE_ERROR = true;
+
+}
diff --git
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/CreateGroupConf.java
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/CreateGroupConf.java
index 36f0b767c..e94352bb5 100644
---
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/CreateGroupConf.java
+++
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/pojo/CreateGroupConf.java
@@ -20,12 +20,7 @@ package org.apache.inlong.manager.client.cli.pojo;
import lombok.Data;
import org.apache.inlong.manager.client.api.transform.MultiDependencyTransform;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.sink.StreamSink;
-import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.pojo.stream.StreamField;
-
-import java.util.List;
/**
* The config of group, including inlong stream, stream source, stream sink,
etc.
@@ -36,8 +31,5 @@ public class CreateGroupConf {
private InlongGroupInfo groupInfo;
private InlongStreamInfo streamInfo;
private MultiDependencyTransform streamTransform;
- private List<StreamField> streamFieldList;
- private StreamSource streamSource;
- private StreamSink streamSink;
}