This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d7c68f9b5 [INLONG-4534][Manager] Add complete unit test to create data 
flow  (#4539)
d7c68f9b5 is described below

commit d7c68f9b58d4ec64f2940f8f6e265b3093a862c4
Author: leosanqing <[email protected]>
AuthorDate: Wed Jun 8 14:45:22 2022 +0800

    [INLONG-4534][Manager] Add complete unit test to create data flow  (#4539)
---
 inlong-manager/manager-client-examples/pom.xml     |   7 +
 .../apache/inlong/manager/client/ut/BaseTest.java  | 170 +++++++++
 .../inlong/manager/client/ut/Kafka2HiveTest.java   | 416 +++++++++++++++++++++
 3 files changed, 593 insertions(+)

diff --git a/inlong-manager/manager-client-examples/pom.xml 
b/inlong-manager/manager-client-examples/pom.xml
index 7bcfed214..e392d0da7 100644
--- a/inlong-manager/manager-client-examples/pom.xml
+++ b/inlong-manager/manager-client-examples/pom.xml
@@ -34,5 +34,12 @@
             <artifactId>manager-client</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <!-- 
https://mvnrepository.com/artifact/com.github.tomakehurst/wiremock-jre8 -->
+        <dependency>
+            <groupId>com.github.tomakehurst</groupId>
+            <artifactId>wiremock-jre8</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
new file mode 100644
index 000000000..fc5d8c933
--- /dev/null
+++ 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.ut;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.google.common.collect.Lists;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.enums.DataSeparator;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.enums.FileFormat;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.common.pojo.sort.FlinkSortConf;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+
+public class BaseTest {
+
+    // Manager web url
+    public static final String SERVICE_URL = "127.0.0.1:8084";
+    // Inlong user && passwd
+    public static DefaultAuthentication inlongAuth = new 
DefaultAuthentication("admin", "inlong");
+    // Inlong group ID
+    public static final String GROUP_ID = "test_group009";
+    // Inlong stream ID
+    public static final String STREAM_ID = "test_stream009";
+    // Flink cluster url
+    public static final String FLINK_URL = "127.0.0.1";
+    // Pulsar cluster admin url
+    public static final String PULSAR_ADMIN_URL = "127.0.0.1";
+    // Pulsar cluster service url
+    public static final String PULSAR_SERVICE_URL = "127.0.0.1";
+    // Pulsar tenant
+    public static final String TENANT = "tenant";
+    // Pulsar tenant
+    public static final String NAMESPACE = "test_namespace";
+    // Pulsar topic
+    public static final String TOPIC = "test_topic";
+    public static final String IN_CHARGES = "test_inCharges,admin";
+
+    public static WireMockServer wireMockServer;
+    public static final String MANAGER_URL_PREFIX = "/api/inlong/manager";
+
+    public static InlongGroupInfo groupInfo;
+    public static InlongClient inlongClient;
+
+    @BeforeAll
+    static void setup() {
+        // create mock server
+        wireMockServer = new WireMockServer(options().port(8084));
+        wireMockServer.start();
+        WireMock.configureFor(wireMockServer.port());
+
+        ClientConfiguration configuration = new ClientConfiguration();
+        configuration.setWriteTimeout(1000);
+        configuration.setReadTimeout(1000);
+        configuration.setConnectTimeout(1000);
+        configuration.setTimeUnit(TimeUnit.SECONDS);
+        configuration.setAuthentication(inlongAuth);
+
+        inlongClient = InlongClient.create(SERVICE_URL, configuration);
+        groupInfo = createGroupInfo();
+    }
+
+    @AfterAll
+    static void teardown() {
+        wireMockServer.stop();
+    }
+
+    /**
+     * Create inlong group info
+     */
+    public static InlongGroupInfo createGroupInfo() {
+        InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
+        pulsarInfo.setInlongGroupId(GROUP_ID);
+        pulsarInfo.setInCharges(IN_CHARGES);
+
+        // pulsar conf
+        pulsarInfo.setServiceUrl(PULSAR_SERVICE_URL);
+        pulsarInfo.setAdminUrl(PULSAR_ADMIN_URL);
+        pulsarInfo.setTenant(TENANT);
+        pulsarInfo.setMqResource(NAMESPACE);
+
+        // set enable zk, create resource, lightweight mode, and cluster tag
+        pulsarInfo.setEnableZookeeper(0);
+        pulsarInfo.setEnableCreateResource(1);
+        pulsarInfo.setLightweight(1);
+        pulsarInfo.setInlongClusterTag("default_cluster");
+
+        pulsarInfo.setDailyRecords(10000000);
+        pulsarInfo.setDailyStorage(10000);
+        pulsarInfo.setPeakRecords(100000);
+        pulsarInfo.setMaxLength(10000);
+
+        // flink conf
+        FlinkSortConf sortConf = new FlinkSortConf();
+        sortConf.setServiceUrl(FLINK_URL);
+        Map<String, String> map = new HashMap<>(16);
+        sortConf.setProperties(map);
+        pulsarInfo.setSortConf(sortConf);
+
+        return pulsarInfo;
+    }
+
+    /**
+     * Create inlong stream info
+     */
+    protected InlongStreamInfo createStreamInfo() {
+        InlongStreamInfo streamInfo = new InlongStreamInfo();
+        streamInfo.setInlongStreamId(STREAM_ID);
+        streamInfo.setName(STREAM_ID);
+        streamInfo.setDataEncoding(StandardCharsets.UTF_8.toString());
+        streamInfo.setDataSeparator(DataSeparator.VERTICAL_BAR.getSeparator());
+        // if you need strictly order for data, set to 1
+        streamInfo.setSyncSend(GlobalConstants.SYNC_SEND);
+        streamInfo.setMqResource(TOPIC);
+        return streamInfo;
+    }
+
+    /**
+     * Create hive sink
+     */
+    protected static HiveSink createHiveSink() {
+        HiveSink hiveSink = new HiveSink();
+        hiveSink.setDbName("{db.name}");
+        hiveSink.setJdbcUrl("jdbc:hive2://{ip:port}");
+        hiveSink.setAuthentication(new DefaultAuthentication("hive", "hive"));
+        hiveSink.setDataEncoding(StandardCharsets.UTF_8.toString());
+        hiveSink.setFileFormat(FileFormat.TextFile.name());
+        hiveSink.setDataSeparator(DataSeparator.VERTICAL_BAR.getSeparator());
+        hiveSink.setDataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}");
+        hiveSink.setFieldList(Lists.newArrayList(
+                new SinkField(0, FieldType.INT.toString(), "age", 
FieldType.INT.toString(), "age"),
+                new SinkField(1, FieldType.STRING.toString(), "name", 
FieldType.STRING.toString(), "name")
+        ));
+
+        hiveSink.setTableName("{table.name}");
+        hiveSink.setSinkName("{hive.sink.name}");
+        return hiveSink;
+    }
+}
diff --git 
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
new file mode 100644
index 000000000..4b9023482
--- /dev/null
+++ 
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.ut;
+
+import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.enums.ProcessStatus;
+import org.apache.inlong.manager.common.enums.TaskStatus;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
+import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
+import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
+import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
+import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
+import static org.apache.inlong.manager.common.enums.ProcessStatus.PROCESSING;
+
+class Kafka2HiveTest extends BaseTest {
+
+    @BeforeAll
+    static void createStub() {
+        stubFor(
+                get(urlMatching(MANAGER_URL_PREFIX + 
"/group/exist/test_group009.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(false)))
+                        )
+        );
+
+        stubFor(
+                post(urlMatching(MANAGER_URL_PREFIX + "/group/save.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success("test_group009")))
+                        )
+        );
+
+        stubFor(
+                get(urlMatching(MANAGER_URL_PREFIX + 
"/stream/exist/test_group009/test_stream009.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(false)))
+                        )
+        );
+
+        stubFor(
+                get(urlMatching(MANAGER_URL_PREFIX + 
"/stream/exist/test_group009/test_stream009.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(false)))
+                        )
+        );
+
+        stubFor(
+                post(urlMatching(MANAGER_URL_PREFIX + "/stream/save.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(6)))
+                        )
+        );
+
+        stubFor(
+                post(urlMatching(MANAGER_URL_PREFIX + "/source/save.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(6)))
+                        )
+        );
+
+        stubFor(
+                post(urlMatching(MANAGER_URL_PREFIX + "/sink/save.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(6)))
+                        )
+        );
+
+        WorkflowResult initWorkflowResult = new WorkflowResult();
+        initWorkflowResult.setProcessInfo(
+                ProcessResponse.builder()
+                        .id(12)
+                        .name("NEW_GROUP_PROCESS")
+                        .displayName("New-Group")
+                        .type("New-Group")
+                        .applicant("admin")
+                        .status(PROCESSING)
+                        .startTime(new Date())
+                        .formData(JsonUtils.parseTree(
+                                
"{\"formName\":\"NewGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\",\"id\":6,"
+                                        + 
"\"inlongGroupId\":\"test_group009\",\"name\":null,\"description\":null,"
+                                        + 
"\"mqResource\":\"test_namespace\",\"enableZookeeper\":0,"
+                                        + 
"\"enableCreateResource\":1,\"lightweight\":1,"
+                                        + 
"\"inlongClusterTag\":\"default_cluster\",\"dailyRecords\":10000000,"
+                                        + 
"\"dailyStorage\":10000,\"peakRecords\":100000,\"maxLength\":10000,"
+                                        + 
"\"inCharges\":\"test_inCharges,admin\",\"followers\":null,\"status\":101,"
+                                        + 
"\"creator\":\"admin\",\"modifier\":\"admin\","
+                                        + "\"createTime\":\"2022-06-06 
09:59:10\","
+                                        + "\"modifyTime\":\"2022-06-06 
02:24:50\",\"extList\":[],\"tenant\":null,"
+                                        + 
"\"adminUrl\":null,\"serviceUrl\":null,\"queueModule\":\"parallel\","
+                                        + 
"\"partitionNum\":3,\"ensemble\":3,\"writeQuorum\":3,\"ackQuorum\":2,"
+                                        + 
"\"ttl\":24,\"ttlUnit\":\"hours\",\"retentionTime\":72,"
+                                        + 
"\"retentionTimeUnit\":\"hours\",\"retentionSize\":-1,"
+                                        + 
"\"retentionSizeUnit\":\"MB\"},\"streamInfoList\":[{\"id\":6,"
+                                        + 
"\"inlongGroupId\":\"test_group009\",\"inlongStreamId\":\"test_stream009\","
+                                        + 
"\"name\":\"test_stream009\",\"sinkList\":[{\"id\":6,"
+                                        + 
"\"inlongGroupId\":\"test_group009\",\"inlongStreamId\":\"test_stream009\","
+                                        + 
"\"sinkType\":\"HIVE\",\"sinkName\":\"{hive.sink.name}\",\"clusterId\":null,"
+                                        + 
"\"clusterUrl\":null}],\"modifyTime\":\"2022-06-06 02:11:03\"}]}"
+                        ))
+                        .build()
+        );
+        initWorkflowResult.setNewTasks(
+                Lists.newArrayList(
+                        TaskResponse.builder()
+                                .id(12)
+                                .type("UserTask")
+                                .processId(12)
+                                .processName("NEW_GROUP_PROCESS")
+                                .processDisplayName("New-Group")
+                                .name("ut_admin")
+                                .displayName("SystemAdmin")
+                                .applicant("admin")
+                                .approvers(Lists.newArrayList("admin"))
+                                .status(TaskStatus.PENDING)
+                                .startTime(new Date())
+                                .build()
+                )
+        );
+        stubFor(
+                post(urlMatching(MANAGER_URL_PREFIX + 
"/group/startProcess/test_group009.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(initWorkflowResult)))
+                        )
+        );
+
+        WorkflowResult startWorkflowResult = new WorkflowResult();
+        startWorkflowResult.setProcessInfo(
+                ProcessResponse.builder()
+                        .id(12)
+                        .name("NEW_GROUP_PROCESS")
+                        .displayName("New-Group")
+                        .type("New-Group")
+                        .applicant("admin")
+                        .status(ProcessStatus.COMPLETED)
+                        .startTime(new Date())
+                        .endTime(new Date())
+                        
.formData("{\"formName\":\"NewGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\","
+                                + 
"\"id\":8,\"inlongGroupId\":\"test_group011\",\"name\":null,\"description\":null,"
+                                + 
"\"mqResource\":\"test_namespace\",\"enableZookeeper\":0,\"enableCreateResource\":1,"
+                                + 
"\"lightweight\":1,\"inlongClusterTag\":\"default_cluster\","
+                                + 
"\"dailyRecords\":10000000,\"dailyStorage\":10000,\"peakRecords\":100000,"
+                                + 
"\"maxLength\":10000,\"inCharges\":\"test_inCharges,admin\",\"followers\":null,"
+                                + 
"\"status\":101,\"creator\":\"admin\",\"modifier\":\"admin\","
+                                + "\"createTime\":\"2022-06-06 
16:36:35\",\"modifyTime\":\"2022-06-06 08:37:04\","
+                                + 
"\"extList\":[],\"tenant\":null,\"adminUrl\":null,\"serviceUrl\":null,"
+                                + 
"\"queueModule\":\"parallel\",\"partitionNum\":3,\"ensemble\":3,\"writeQuorum\":3,"
+                                + 
"\"ackQuorum\":2,\"ttl\":24,\"ttlUnit\":\"hours\",\"retentionTime\":72,"
+                                + 
"\"retentionTimeUnit\":\"hours\",\"retentionSize\":-1,\"retentionSizeUnit\":\"MB\"},"
+                                + 
"\"streamInfoList\":[{\"id\":8,\"inlongGroupId\":\"test_group011\","
+                                + 
"\"inlongStreamId\":\"test_stream011\",\"name\":\"test_stream011\","
+                                + 
"\"sinkList\":[{\"id\":8,\"inlongGroupId\":\"test_group011\","
+                                + 
"\"inlongStreamId\":\"test_stream011\",\"sinkType\":\"HIVE\","
+                                + 
"\"sinkName\":\"{hive.sink.name}\",\"clusterId\":null,\"clusterUrl\":null}],"
+                                + "\"modifyTime\":\"2022-06-06 08:36:38\"}]}")
+                        .build()
+        );
+        startWorkflowResult.setNewTasks(new ArrayList<>());
+        stubFor(
+                post(urlMatching(MANAGER_URL_PREFIX + 
"/workflow/approve/12.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(startWorkflowResult)))
+                        )
+        );
+
+        stubFor(
+                get(urlMatching(MANAGER_URL_PREFIX + 
"/group/get/test_group009.*"))
+                        .willReturn(
+                                okJson(JsonUtils.toJsonString(Response.success(
+                                        InlongPulsarInfo.builder()
+                                                .id(8)
+                                                .inlongGroupId("test_group009")
+                                                .mqType("PULSAR")
+                                                .mqResource("test_namespace")
+                                                .enableZookeeper(0)
+                                                .enableCreateResource(1)
+                                                .lightweight(1)
+                                                
.inlongClusterTag("default_cluster")
+                                                
.inCharges("test_inCharges,admin")
+                                                .dailyRecords(10000000)
+                                                .dailyStorage(10000)
+                                                .peakRecords(100000)
+                                                .maxLength(10000)
+                                                .status(120)
+                                                .creator("admin")
+                                                .modifier("admin")
+                                                .createTime(new Date())
+                                                .modifyTime(new Date())
+                                                .extList(new ArrayList<>())
+                                                .queueModule("parallel")
+                                                .partitionNum(3)
+                                                .ensemble(3)
+                                                .writeQuorum(3)
+                                                .ttl(24)
+                                                .ttlUnit("hours")
+                                                .retentionTime(72)
+                                                .retentionTimeUnit("hours")
+                                                .retentionSize(-1)
+                                                .retentionSizeUnit("MB")
+                                                .build()
+                                )))
+                        )
+        );
+
+        Response<PageInfo<FullStreamResponse>> fullStreamResponsePage = 
Response.success(
+                new PageInfo<>(
+                        Lists.newArrayList(FullStreamResponse.builder()
+                                .streamInfo(InlongStreamInfo.builder()
+                                        .id(8)
+                                        .inlongGroupId(GROUP_ID)
+                                        .inlongStreamId(STREAM_ID)
+                                        .name(STREAM_ID)
+                                        .mqResource("test_topic")
+                                        .dataEncoding("UTF-8")
+                                        .dataSeparator("|")
+                                        .syncSend(1)
+                                        .dailyRecords(10)
+                                        .dailyStorage(10)
+                                        .peakRecords(1000)
+                                        .maxLength(10240)
+                                        .storagePeriod(1)
+                                        .status(120)
+                                        .creator("admin")
+                                        .modifier("admin")
+                                        .createTime(new Date())
+                                        .modifyTime(new Date())
+                                        .fieldList(createStreamFields())
+                                        .build()
+                                )
+                                .sourceInfo(Lists.newArrayList(
+                                        KafkaSource.builder()
+                                                .id(6)
+                                                .topic(TOPIC)
+                                                
.bootstrapServers("{kafka.bootstrap}")
+                                                .inlongGroupId(GROUP_ID)
+                                                .inlongStreamId(STREAM_ID)
+                                                .sourceType("KAFKA")
+                                                
.sourceName("{kafka.source.name}")
+                                                .serializationType("json")
+                                                .version(1)
+                                                .status(110)
+                                                .creator("admin")
+                                                .modifier("admin")
+                                                .createTime(new Date())
+                                                .modifyTime(new Date())
+                                                .build()
+                                ))
+                                .sinkInfo(Lists.newArrayList(
+                                        HiveSink.builder()
+                                                .id(6)
+                                                .inlongStreamId(STREAM_ID)
+                                                .inlongGroupId(GROUP_ID)
+                                                
.jdbcUrl("jdbc:hive2://{ip:port}")
+                                                .dbName("test_db")
+                                                .tableName("test_table")
+                                                
.dataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}")
+                                                .fileFormat("TextFile")
+                                                .dataEncoding("UTF-8")
+                                                .dataSeparator("|")
+                                                .sinkType("HIVE")
+                                                .sinkName("sink_name")
+                                                .enableCreateResource(1)
+                                                .status(110)
+                                                .creator("admin")
+                                                .modifier("admin")
+                                                .dataFormat(DataFormat.NONE)
+                                                .fieldList(Lists.newArrayList(
+                                                        SinkField.builder()
+                                                                .id(17)
+                                                                
.fieldName("age")
+                                                                
.fieldType("INT")
+                                                                
.fieldComment("age")
+                                                                
.sourceFieldName("age")
+                                                                
.sourceFieldType("INT")
+                                                                .build(),
+                                                        SinkField.builder()
+                                                                .id(18)
+                                                                
.fieldName("name")
+                                                                
.fieldType("STRING")
+                                                                
.fieldComment("name")
+                                                                
.sourceFieldName("name")
+                                                                
.sourceFieldType("STRING")
+                                                                .build()
+                                                ))
+                                                .build()
+                                ))
+                                .build())
+                )
+        );
+        stubFor(
+                post(urlMatching(MANAGER_URL_PREFIX + "/stream/listAll.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(fullStreamResponsePage))
+                        )
+        );
+
+        stubFor(
+                get(urlMatching(MANAGER_URL_PREFIX + "/workflow/event/list.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(new PageInfo<>(
+                                        Lists.newArrayList(
+                                                EventLogView.builder()
+                                                        .id(39)
+                                                        .processId(12)
+                                                        
.processName("CREATE_LIGHT_GROUP_PROCESS")
+                                                        
.processDisplayName("Create-Light-Group")
+                                                        
.inlongGroupId(GROUP_ID)
+                                                        .taskId(12)
+                                                        
.elementName("initSort")
+                                                        
.elementDisplayName("Group-InitSort")
+                                                        
.eventType("ProcessEvent")
+                                                        .event("FAIL")
+                                                        
.listener("LightGroupFailedListener")
+                                                        .status(-1)
+                                                        .ip("127.0.0.1")
+                                                        .build(),
+                                                EventLogView.builder()
+                                                        .id(38)
+                                                        .processId(12)
+                                                        
.processName("CREATE_LIGHT_GROUP_PROCESS")
+                                                        
.processDisplayName("Create-Light-Group")
+                                                        
.inlongGroupId(GROUP_ID)
+                                                        .taskId(12)
+                                                        
.elementName("initSort")
+                                                        
.elementDisplayName("Group-InitSort")
+                                                        .eventType("TaskEvent")
+                                                        .event("COMPLETE")
+                                                        
.listener("LightGroupSortListener")
+                                                        .ip("127.0.0.1")
+                                                        .build()
+                                        )
+                                ))))
+                        )
+        );
+
+        stubFor(
+                get(urlMatching(MANAGER_URL_PREFIX + 
"/stream/config/log/list.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(new PageInfo<>())))
+                        )
+        );
+    }
+
+    @Test
+    void testCreateGroupForHive() {
+        Assertions.assertDoesNotThrow(() -> {
+            InlongGroup group = inlongClient.forGroup(groupInfo);
+            InlongStreamBuilder streamBuilder = 
group.createStream(createStreamInfo());
+            streamBuilder.fields(createStreamFields());
+            streamBuilder.source(createKafkaSource());
+            streamBuilder.sink(createHiveSink());
+            streamBuilder.initOrUpdate();
+            // start group
+            InlongGroupContext inlongGroupContext = group.init();
+            Assertions.assertNotNull(inlongGroupContext);
+        });
+
+    }
+
+    private static KafkaSource createKafkaSource() {
+        KafkaSource kafkaSource = new KafkaSource();
+        kafkaSource.setBootstrapServers("127.0.0.1");
+        kafkaSource.setTopic("test_topic");
+        kafkaSource.setSourceName("kafka_source_name");
+        kafkaSource.setSerializationType(DataFormat.JSON.getName());
+        return kafkaSource;
+    }
+
+    private static List<StreamField> createStreamFields() {
+        return Lists.newArrayList(
+                new StreamField(0, FieldType.STRING.toString(), "name", null, 
null),
+                new StreamField(1, FieldType.INT.toString(), "age", null, null)
+        );
+    }
+}

Reply via email to