This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d7c68f9b5 [INLONG-4534][Manager] Add complete unit test to create data
flow (#4539)
d7c68f9b5 is described below
commit d7c68f9b58d4ec64f2940f8f6e265b3093a862c4
Author: leosanqing <[email protected]>
AuthorDate: Wed Jun 8 14:45:22 2022 +0800
[INLONG-4534][Manager] Add complete unit test to create data flow (#4539)
---
inlong-manager/manager-client-examples/pom.xml | 7 +
.../apache/inlong/manager/client/ut/BaseTest.java | 170 +++++++++
.../inlong/manager/client/ut/Kafka2HiveTest.java | 416 +++++++++++++++++++++
3 files changed, 593 insertions(+)
diff --git a/inlong-manager/manager-client-examples/pom.xml
b/inlong-manager/manager-client-examples/pom.xml
index 7bcfed214..e392d0da7 100644
--- a/inlong-manager/manager-client-examples/pom.xml
+++ b/inlong-manager/manager-client-examples/pom.xml
@@ -34,5 +34,12 @@
<artifactId>manager-client</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!--
https://mvnrepository.com/artifact/com.github.tomakehurst/wiremock-jre8 -->
+ <dependency>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock-jre8</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
new file mode 100644
index 000000000..fc5d8c933
--- /dev/null
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.ut;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.google.common.collect.Lists;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.InlongClient;
+import org.apache.inlong.manager.common.auth.DefaultAuthentication;
+import org.apache.inlong.manager.common.enums.DataSeparator;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.enums.FileFormat;
+import org.apache.inlong.manager.common.enums.GlobalConstants;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.common.pojo.sort.FlinkSortConf;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static
com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
+
+public class BaseTest {
+
+ // Manager web url
+ public static final String SERVICE_URL = "127.0.0.1:8084";
+ // Inlong user && passwd
+ public static DefaultAuthentication inlongAuth = new
DefaultAuthentication("admin", "inlong");
+ // Inlong group ID
+ public static final String GROUP_ID = "test_group009";
+ // Inlong stream ID
+ public static final String STREAM_ID = "test_stream009";
+ // Flink cluster url
+ public static final String FLINK_URL = "127.0.0.1";
+ // Pulsar cluster admin url
+ public static final String PULSAR_ADMIN_URL = "127.0.0.1";
+ // Pulsar cluster service url
+ public static final String PULSAR_SERVICE_URL = "127.0.0.1";
+ // Pulsar tenant
+ public static final String TENANT = "tenant";
+ // Pulsar tenant
+ public static final String NAMESPACE = "test_namespace";
+ // Pulsar topic
+ public static final String TOPIC = "test_topic";
+ public static final String IN_CHARGES = "test_inCharges,admin";
+
+ public static WireMockServer wireMockServer;
+ public static final String MANAGER_URL_PREFIX = "/api/inlong/manager";
+
+ public static InlongGroupInfo groupInfo;
+ public static InlongClient inlongClient;
+
+ @BeforeAll
+ static void setup() {
+ // create mock server
+ wireMockServer = new WireMockServer(options().port(8084));
+ wireMockServer.start();
+ WireMock.configureFor(wireMockServer.port());
+
+ ClientConfiguration configuration = new ClientConfiguration();
+ configuration.setWriteTimeout(1000);
+ configuration.setReadTimeout(1000);
+ configuration.setConnectTimeout(1000);
+ configuration.setTimeUnit(TimeUnit.SECONDS);
+ configuration.setAuthentication(inlongAuth);
+
+ inlongClient = InlongClient.create(SERVICE_URL, configuration);
+ groupInfo = createGroupInfo();
+ }
+
+ @AfterAll
+ static void teardown() {
+ wireMockServer.stop();
+ }
+
+ /**
+ * Create inlong group info
+ */
+ public static InlongGroupInfo createGroupInfo() {
+ InlongPulsarInfo pulsarInfo = new InlongPulsarInfo();
+ pulsarInfo.setInlongGroupId(GROUP_ID);
+ pulsarInfo.setInCharges(IN_CHARGES);
+
+ // pulsar conf
+ pulsarInfo.setServiceUrl(PULSAR_SERVICE_URL);
+ pulsarInfo.setAdminUrl(PULSAR_ADMIN_URL);
+ pulsarInfo.setTenant(TENANT);
+ pulsarInfo.setMqResource(NAMESPACE);
+
+ // set enable zk, create resource, lightweight mode, and cluster tag
+ pulsarInfo.setEnableZookeeper(0);
+ pulsarInfo.setEnableCreateResource(1);
+ pulsarInfo.setLightweight(1);
+ pulsarInfo.setInlongClusterTag("default_cluster");
+
+ pulsarInfo.setDailyRecords(10000000);
+ pulsarInfo.setDailyStorage(10000);
+ pulsarInfo.setPeakRecords(100000);
+ pulsarInfo.setMaxLength(10000);
+
+ // flink conf
+ FlinkSortConf sortConf = new FlinkSortConf();
+ sortConf.setServiceUrl(FLINK_URL);
+ Map<String, String> map = new HashMap<>(16);
+ sortConf.setProperties(map);
+ pulsarInfo.setSortConf(sortConf);
+
+ return pulsarInfo;
+ }
+
+ /**
+ * Create inlong stream info
+ */
+ protected InlongStreamInfo createStreamInfo() {
+ InlongStreamInfo streamInfo = new InlongStreamInfo();
+ streamInfo.setInlongStreamId(STREAM_ID);
+ streamInfo.setName(STREAM_ID);
+ streamInfo.setDataEncoding(StandardCharsets.UTF_8.toString());
+ streamInfo.setDataSeparator(DataSeparator.VERTICAL_BAR.getSeparator());
+ // if you need strictly order for data, set to 1
+ streamInfo.setSyncSend(GlobalConstants.SYNC_SEND);
+ streamInfo.setMqResource(TOPIC);
+ return streamInfo;
+ }
+
+ /**
+ * Create hive sink
+ */
+ protected static HiveSink createHiveSink() {
+ HiveSink hiveSink = new HiveSink();
+ hiveSink.setDbName("{db.name}");
+ hiveSink.setJdbcUrl("jdbc:hive2://{ip:port}");
+ hiveSink.setAuthentication(new DefaultAuthentication("hive", "hive"));
+ hiveSink.setDataEncoding(StandardCharsets.UTF_8.toString());
+ hiveSink.setFileFormat(FileFormat.TextFile.name());
+ hiveSink.setDataSeparator(DataSeparator.VERTICAL_BAR.getSeparator());
+ hiveSink.setDataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}");
+ hiveSink.setFieldList(Lists.newArrayList(
+ new SinkField(0, FieldType.INT.toString(), "age",
FieldType.INT.toString(), "age"),
+ new SinkField(1, FieldType.STRING.toString(), "name",
FieldType.STRING.toString(), "name")
+ ));
+
+ hiveSink.setTableName("{table.name}");
+ hiveSink.setSinkName("{hive.sink.name}");
+ return hiveSink;
+ }
+}
diff --git
a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
new file mode 100644
index 000000000..4b9023482
--- /dev/null
+++
b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.ut;
+
+import com.github.pagehelper.PageInfo;
+import com.google.common.collect.Lists;
+import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.client.api.InlongGroupContext;
+import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.common.beans.Response;
+import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.enums.ProcessStatus;
+import org.apache.inlong.manager.common.enums.TaskStatus;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.sink.SinkField;
+import org.apache.inlong.manager.common.pojo.sink.hive.HiveSink;
+import org.apache.inlong.manager.common.pojo.source.kafka.KafkaSource;
+import org.apache.inlong.manager.common.pojo.stream.FullStreamResponse;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
+import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
+import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
+import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
+import static org.apache.inlong.manager.common.enums.ProcessStatus.PROCESSING;
+
+class Kafka2HiveTest extends BaseTest {
+
+ @BeforeAll
+ static void createStub() {
+ stubFor(
+ get(urlMatching(MANAGER_URL_PREFIX +
"/group/exist/test_group009.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(false)))
+ )
+ );
+
+ stubFor(
+ post(urlMatching(MANAGER_URL_PREFIX + "/group/save.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success("test_group009")))
+ )
+ );
+
+ stubFor(
+ get(urlMatching(MANAGER_URL_PREFIX +
"/stream/exist/test_group009/test_stream009.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(false)))
+ )
+ );
+
+ stubFor(
+ get(urlMatching(MANAGER_URL_PREFIX +
"/stream/exist/test_group009/test_stream009.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(false)))
+ )
+ );
+
+ stubFor(
+ post(urlMatching(MANAGER_URL_PREFIX + "/stream/save.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(6)))
+ )
+ );
+
+ stubFor(
+ post(urlMatching(MANAGER_URL_PREFIX + "/source/save.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(6)))
+ )
+ );
+
+ stubFor(
+ post(urlMatching(MANAGER_URL_PREFIX + "/sink/save.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(6)))
+ )
+ );
+
+ WorkflowResult initWorkflowResult = new WorkflowResult();
+ initWorkflowResult.setProcessInfo(
+ ProcessResponse.builder()
+ .id(12)
+ .name("NEW_GROUP_PROCESS")
+ .displayName("New-Group")
+ .type("New-Group")
+ .applicant("admin")
+ .status(PROCESSING)
+ .startTime(new Date())
+ .formData(JsonUtils.parseTree(
+
"{\"formName\":\"NewGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\",\"id\":6,"
+ +
"\"inlongGroupId\":\"test_group009\",\"name\":null,\"description\":null,"
+ +
"\"mqResource\":\"test_namespace\",\"enableZookeeper\":0,"
+ +
"\"enableCreateResource\":1,\"lightweight\":1,"
+ +
"\"inlongClusterTag\":\"default_cluster\",\"dailyRecords\":10000000,"
+ +
"\"dailyStorage\":10000,\"peakRecords\":100000,\"maxLength\":10000,"
+ +
"\"inCharges\":\"test_inCharges,admin\",\"followers\":null,\"status\":101,"
+ +
"\"creator\":\"admin\",\"modifier\":\"admin\","
+ + "\"createTime\":\"2022-06-06
09:59:10\","
+ + "\"modifyTime\":\"2022-06-06
02:24:50\",\"extList\":[],\"tenant\":null,"
+ +
"\"adminUrl\":null,\"serviceUrl\":null,\"queueModule\":\"parallel\","
+ +
"\"partitionNum\":3,\"ensemble\":3,\"writeQuorum\":3,\"ackQuorum\":2,"
+ +
"\"ttl\":24,\"ttlUnit\":\"hours\",\"retentionTime\":72,"
+ +
"\"retentionTimeUnit\":\"hours\",\"retentionSize\":-1,"
+ +
"\"retentionSizeUnit\":\"MB\"},\"streamInfoList\":[{\"id\":6,"
+ +
"\"inlongGroupId\":\"test_group009\",\"inlongStreamId\":\"test_stream009\","
+ +
"\"name\":\"test_stream009\",\"sinkList\":[{\"id\":6,"
+ +
"\"inlongGroupId\":\"test_group009\",\"inlongStreamId\":\"test_stream009\","
+ +
"\"sinkType\":\"HIVE\",\"sinkName\":\"{hive.sink.name}\",\"clusterId\":null,"
+ +
"\"clusterUrl\":null}],\"modifyTime\":\"2022-06-06 02:11:03\"}]}"
+ ))
+ .build()
+ );
+ initWorkflowResult.setNewTasks(
+ Lists.newArrayList(
+ TaskResponse.builder()
+ .id(12)
+ .type("UserTask")
+ .processId(12)
+ .processName("NEW_GROUP_PROCESS")
+ .processDisplayName("New-Group")
+ .name("ut_admin")
+ .displayName("SystemAdmin")
+ .applicant("admin")
+ .approvers(Lists.newArrayList("admin"))
+ .status(TaskStatus.PENDING)
+ .startTime(new Date())
+ .build()
+ )
+ );
+ stubFor(
+ post(urlMatching(MANAGER_URL_PREFIX +
"/group/startProcess/test_group009.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(initWorkflowResult)))
+ )
+ );
+
+ WorkflowResult startWorkflowResult = new WorkflowResult();
+ startWorkflowResult.setProcessInfo(
+ ProcessResponse.builder()
+ .id(12)
+ .name("NEW_GROUP_PROCESS")
+ .displayName("New-Group")
+ .type("New-Group")
+ .applicant("admin")
+ .status(ProcessStatus.COMPLETED)
+ .startTime(new Date())
+ .endTime(new Date())
+
.formData("{\"formName\":\"NewGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\","
+ +
"\"id\":8,\"inlongGroupId\":\"test_group011\",\"name\":null,\"description\":null,"
+ +
"\"mqResource\":\"test_namespace\",\"enableZookeeper\":0,\"enableCreateResource\":1,"
+ +
"\"lightweight\":1,\"inlongClusterTag\":\"default_cluster\","
+ +
"\"dailyRecords\":10000000,\"dailyStorage\":10000,\"peakRecords\":100000,"
+ +
"\"maxLength\":10000,\"inCharges\":\"test_inCharges,admin\",\"followers\":null,"
+ +
"\"status\":101,\"creator\":\"admin\",\"modifier\":\"admin\","
+ + "\"createTime\":\"2022-06-06
16:36:35\",\"modifyTime\":\"2022-06-06 08:37:04\","
+ +
"\"extList\":[],\"tenant\":null,\"adminUrl\":null,\"serviceUrl\":null,"
+ +
"\"queueModule\":\"parallel\",\"partitionNum\":3,\"ensemble\":3,\"writeQuorum\":3,"
+ +
"\"ackQuorum\":2,\"ttl\":24,\"ttlUnit\":\"hours\",\"retentionTime\":72,"
+ +
"\"retentionTimeUnit\":\"hours\",\"retentionSize\":-1,\"retentionSizeUnit\":\"MB\"},"
+ +
"\"streamInfoList\":[{\"id\":8,\"inlongGroupId\":\"test_group011\","
+ +
"\"inlongStreamId\":\"test_stream011\",\"name\":\"test_stream011\","
+ +
"\"sinkList\":[{\"id\":8,\"inlongGroupId\":\"test_group011\","
+ +
"\"inlongStreamId\":\"test_stream011\",\"sinkType\":\"HIVE\","
+ +
"\"sinkName\":\"{hive.sink.name}\",\"clusterId\":null,\"clusterUrl\":null}],"
+ + "\"modifyTime\":\"2022-06-06 08:36:38\"}]}")
+ .build()
+ );
+ startWorkflowResult.setNewTasks(new ArrayList<>());
+ stubFor(
+ post(urlMatching(MANAGER_URL_PREFIX +
"/workflow/approve/12.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(startWorkflowResult)))
+ )
+ );
+
+ stubFor(
+ get(urlMatching(MANAGER_URL_PREFIX +
"/group/get/test_group009.*"))
+ .willReturn(
+ okJson(JsonUtils.toJsonString(Response.success(
+ InlongPulsarInfo.builder()
+ .id(8)
+ .inlongGroupId("test_group009")
+ .mqType("PULSAR")
+ .mqResource("test_namespace")
+ .enableZookeeper(0)
+ .enableCreateResource(1)
+ .lightweight(1)
+
.inlongClusterTag("default_cluster")
+
.inCharges("test_inCharges,admin")
+ .dailyRecords(10000000)
+ .dailyStorage(10000)
+ .peakRecords(100000)
+ .maxLength(10000)
+ .status(120)
+ .creator("admin")
+ .modifier("admin")
+ .createTime(new Date())
+ .modifyTime(new Date())
+ .extList(new ArrayList<>())
+ .queueModule("parallel")
+ .partitionNum(3)
+ .ensemble(3)
+ .writeQuorum(3)
+ .ttl(24)
+ .ttlUnit("hours")
+ .retentionTime(72)
+ .retentionTimeUnit("hours")
+ .retentionSize(-1)
+ .retentionSizeUnit("MB")
+ .build()
+ )))
+ )
+ );
+
+ Response<PageInfo<FullStreamResponse>> fullStreamResponsePage =
Response.success(
+ new PageInfo<>(
+ Lists.newArrayList(FullStreamResponse.builder()
+ .streamInfo(InlongStreamInfo.builder()
+ .id(8)
+ .inlongGroupId(GROUP_ID)
+ .inlongStreamId(STREAM_ID)
+ .name(STREAM_ID)
+ .mqResource("test_topic")
+ .dataEncoding("UTF-8")
+ .dataSeparator("|")
+ .syncSend(1)
+ .dailyRecords(10)
+ .dailyStorage(10)
+ .peakRecords(1000)
+ .maxLength(10240)
+ .storagePeriod(1)
+ .status(120)
+ .creator("admin")
+ .modifier("admin")
+ .createTime(new Date())
+ .modifyTime(new Date())
+ .fieldList(createStreamFields())
+ .build()
+ )
+ .sourceInfo(Lists.newArrayList(
+ KafkaSource.builder()
+ .id(6)
+ .topic(TOPIC)
+
.bootstrapServers("{kafka.bootstrap}")
+ .inlongGroupId(GROUP_ID)
+ .inlongStreamId(STREAM_ID)
+ .sourceType("KAFKA")
+
.sourceName("{kafka.source.name}")
+ .serializationType("json")
+ .version(1)
+ .status(110)
+ .creator("admin")
+ .modifier("admin")
+ .createTime(new Date())
+ .modifyTime(new Date())
+ .build()
+ ))
+ .sinkInfo(Lists.newArrayList(
+ HiveSink.builder()
+ .id(6)
+ .inlongStreamId(STREAM_ID)
+ .inlongGroupId(GROUP_ID)
+
.jdbcUrl("jdbc:hive2://{ip:port}")
+ .dbName("test_db")
+ .tableName("test_table")
+
.dataPath("hdfs://{ip:port}/usr/hive/warehouse/{db.name}")
+ .fileFormat("TextFile")
+ .dataEncoding("UTF-8")
+ .dataSeparator("|")
+ .sinkType("HIVE")
+ .sinkName("sink_name")
+ .enableCreateResource(1)
+ .status(110)
+ .creator("admin")
+ .modifier("admin")
+ .dataFormat(DataFormat.NONE)
+ .fieldList(Lists.newArrayList(
+ SinkField.builder()
+ .id(17)
+
.fieldName("age")
+
.fieldType("INT")
+
.fieldComment("age")
+
.sourceFieldName("age")
+
.sourceFieldType("INT")
+ .build(),
+ SinkField.builder()
+ .id(18)
+
.fieldName("name")
+
.fieldType("STRING")
+
.fieldComment("name")
+
.sourceFieldName("name")
+
.sourceFieldType("STRING")
+ .build()
+ ))
+ .build()
+ ))
+ .build())
+ )
+ );
+ stubFor(
+ post(urlMatching(MANAGER_URL_PREFIX + "/stream/listAll.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(fullStreamResponsePage))
+ )
+ );
+
+ stubFor(
+ get(urlMatching(MANAGER_URL_PREFIX + "/workflow/event/list.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(new PageInfo<>(
+ Lists.newArrayList(
+ EventLogView.builder()
+ .id(39)
+ .processId(12)
+
.processName("CREATE_LIGHT_GROUP_PROCESS")
+
.processDisplayName("Create-Light-Group")
+
.inlongGroupId(GROUP_ID)
+ .taskId(12)
+
.elementName("initSort")
+
.elementDisplayName("Group-InitSort")
+
.eventType("ProcessEvent")
+ .event("FAIL")
+
.listener("LightGroupFailedListener")
+ .status(-1)
+ .ip("127.0.0.1")
+ .build(),
+ EventLogView.builder()
+ .id(38)
+ .processId(12)
+
.processName("CREATE_LIGHT_GROUP_PROCESS")
+
.processDisplayName("Create-Light-Group")
+
.inlongGroupId(GROUP_ID)
+ .taskId(12)
+
.elementName("initSort")
+
.elementDisplayName("Group-InitSort")
+ .eventType("TaskEvent")
+ .event("COMPLETE")
+
.listener("LightGroupSortListener")
+ .ip("127.0.0.1")
+ .build()
+ )
+ ))))
+ )
+ );
+
+ stubFor(
+ get(urlMatching(MANAGER_URL_PREFIX +
"/stream/config/log/list.*"))
+ .willReturn(
+
okJson(JsonUtils.toJsonString(Response.success(new PageInfo<>())))
+ )
+ );
+ }
+
+ @Test
+ void testCreateGroupForHive() {
+ Assertions.assertDoesNotThrow(() -> {
+ InlongGroup group = inlongClient.forGroup(groupInfo);
+ InlongStreamBuilder streamBuilder =
group.createStream(createStreamInfo());
+ streamBuilder.fields(createStreamFields());
+ streamBuilder.source(createKafkaSource());
+ streamBuilder.sink(createHiveSink());
+ streamBuilder.initOrUpdate();
+ // start group
+ InlongGroupContext inlongGroupContext = group.init();
+ Assertions.assertNotNull(inlongGroupContext);
+ });
+
+ }
+
+ private static KafkaSource createKafkaSource() {
+ KafkaSource kafkaSource = new KafkaSource();
+ kafkaSource.setBootstrapServers("127.0.0.1");
+ kafkaSource.setTopic("test_topic");
+ kafkaSource.setSourceName("kafka_source_name");
+ kafkaSource.setSerializationType(DataFormat.JSON.getName());
+ return kafkaSource;
+ }
+
+ private static List<StreamField> createStreamFields() {
+ return Lists.newArrayList(
+ new StreamField(0, FieldType.STRING.toString(), "name", null,
null),
+ new StreamField(1, FieldType.INT.toString(), "age", null, null)
+ );
+ }
+}