This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.0.0 in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
commit e85c72ec71e55e3fbe2a28c4c44b52d2da7d1846 Author: Kevin Wen <[email protected]> AuthorDate: Wed Feb 9 10:16:47 2022 +0800 [INLONG-2401][Bug][InLong-Sort] Fix bugs in kafka sink unit tests (#2401) (#2402) --- inlong-sort/sort-single-tenant/pom.xml | 14 ++-- .../flink/kafka/KafkaSinkTestBase.java | 60 +++++++++++++---- .../flink/kafka/RowToJsonKafkaSinkTest.java | 11 +--- .../flink/kafka/RowToStringKafkaSinkTest.java | 11 +--- .../sort/singletenant/flink/utils/NetUtils.java | 77 ++++++++++++++++++++++ 5 files changed, 136 insertions(+), 37 deletions(-) diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml index dce0bb0..7448a3f 100644 --- a/inlong-sort/sort-single-tenant/pom.xml +++ b/inlong-sort/sort-single-tenant/pom.xml @@ -54,12 +54,6 @@ </dependency> <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-connectors</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <scope>provided</scope> @@ -104,6 +98,14 @@ <scope>test</scope> </dependency> + <!-- kafka sink tests will be failed without it --> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.10.0</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_${flink.scala.binary.version}</artifactId> diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java index aa075ea..e4ffcc8 100644 --- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java +++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/KafkaSinkTestBase.java @@ -31,6 +31,7 @@ import org.apache.inlong.sort.protocol.serialization.SerializationInfo; import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.network.ListenerName; @@ -42,11 +43,14 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; import scala.collection.mutable.ArraySeq; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -59,9 +63,13 @@ import java.util.concurrent.TimeoutException; import static org.apache.flink.util.NetUtils.hostAndPortToUrlString; import static org.apache.inlong.sort.singletenant.flink.kafka.KafkaSinkBuilder.buildKafkaSink; +import static org.apache.inlong.sort.singletenant.flink.utils.NetUtils.getUnusedLocalPort; +import static org.junit.Assert.assertNull; public abstract class KafkaSinkTestBase { + private static final Logger logger = LoggerFactory.getLogger(KafkaSinkTestBase.class); + @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder(); @@ -84,13 +92,20 @@ public abstract class KafkaSinkTestBase { @Before public void setup() throws Exception { prepareData(); + logger.info("Prepare data passed."); startZK(); + logger.info("ZK started."); startKafkaServer(); + logger.info("Kafka server started."); prepareKafkaClientProps(); + logger.info("Kafka client properties prepared."); kafkaAdmin = AdminClient.create(kafkaClientProperties); + logger.info("Kafka admin started."); addTopic(); + logger.info("Topic added to kafka server."); kafkaConsumer = new KafkaConsumer<>(kafkaClientProperties); + logger.info("Kafka consumer started."); } private void startZK() throws Exception { @@ -102,6 +117,7 @@ public abstract class KafkaSinkTestBase { Properties kafkaProperties = new Properties(); final String KAFKA_HOST = "localhost"; kafkaProperties.put("advertised.host.name", KAFKA_HOST); + kafkaProperties.put("port", Integer.toString(getUnusedLocalPort(1024))); kafkaProperties.put("broker.id", "1"); kafkaProperties.put("log.dir", tempFolder.newFolder().getAbsolutePath()); kafkaProperties.put("zookeeper.connect", zkServer.getConnectString()); @@ -120,6 +136,7 @@ public abstract class KafkaSinkTestBase { ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) ) ); + logger.info("Kafka broker conn str = " + brokerConnStr); } private void prepareKafkaClientProps() { @@ -149,17 +166,29 @@ public abstract class KafkaSinkTestBase { @After public void clean() throws IOException { - kafkaConsumer.close(); - kafkaConsumer = null; + if (kafkaConsumer != null) { + kafkaConsumer.close(); + kafkaConsumer = null; + logger.info("Kafka consumer closed."); + } - kafkaAdmin.close(); - kafkaAdmin = null; + if (kafkaAdmin != null) { + kafkaAdmin.close(); + kafkaAdmin = null; + logger.info("Kafka admin closed."); + } - kafkaServer.shutdown(); - kafkaServer = null; + if (kafkaServer != null) { + kafkaServer.shutdown(); + kafkaServer = null; + logger.info("Kafka server closed."); + } - zkServer.close(); - zkServer = null; + if (zkServer != null) { + zkServer.close(); + zkServer = null; + logger.info("ZK closed."); + } } @Test(timeout = 3 * 60 * 1000) @@ -192,21 +221,30 @@ public abstract class KafkaSinkTestBase { private void verify() throws InterruptedException { kafkaConsumer.subscribe(Collections.singleton(topic)); + List<String> results = new ArrayList<>(); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1)); - if (records.isEmpty() || records.count() != testRows.size()) { + if (!records.isEmpty()) { + for (ConsumerRecord<String, String> record : records) { + assertNull(record.key()); + results.add(record.value()); + } + } + + if (results.size() != testRows.size()) { //noinspection BusyWait Thread.sleep(1000); + logger.info("for topic " + topic + ", record size = " + results.size()); continue; } - verifyData(records); + verifyData(results); break; } } - protected abstract void verifyData(ConsumerRecords<String, String> records); + protected abstract void verifyData(List<String> results); private TestingSource createTestingSource() { TestingSource testingSource = new TestingSource(); diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java index f92025c..7bade20 100644 --- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java +++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToJsonKafkaSinkTest.java @@ -26,8 +26,6 @@ import org.apache.inlong.sort.formats.common.MapFormatInfo; import org.apache.inlong.sort.formats.common.StringFormatInfo; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.serialization.JsonSerializationInfo; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.ArrayList; import java.util.HashMap; @@ -35,7 +33,6 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase { @Override @@ -72,13 +69,7 @@ public class RowToJsonKafkaSinkTest extends KafkaSinkTestBase { } @Override - protected void verifyData(ConsumerRecords<String, String> records) { - List<String> results = new ArrayList<>(); - for (ConsumerRecord<String, String> record : records) { - assertNull(record.key()); - results.add(record.value()); - } - + protected void verifyData(List<String> results) { List<String> expectedData = new ArrayList<>(); expectedData.add("{\"f1\":\"zhangsan\",\"f2\":{\"high\":170.5},\"f3\":[123]}"); expectedData.add("{\"f1\":\"lisi\",\"f2\":{\"high\":180.5},\"f3\":[1234]}"); diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java index 3617571..fc10f1f 100644 --- a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java +++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/kafka/RowToStringKafkaSinkTest.java @@ -22,14 +22,11 @@ import org.apache.flink.types.Row; import org.apache.inlong.sort.formats.common.DoubleFormatInfo; import org.apache.inlong.sort.formats.common.StringFormatInfo; import org.apache.inlong.sort.protocol.FieldInfo; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import java.util.ArrayList; import java.util.List; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; public class RowToStringKafkaSinkTest extends KafkaSinkTestBase { @@ -56,13 +53,7 @@ public class RowToStringKafkaSinkTest extends KafkaSinkTestBase { } @Override - protected void verifyData(ConsumerRecords<String, String> records) { - List<String> results = new ArrayList<>(); - for (ConsumerRecord<String, String> record : records) { - assertNull(record.key()); - results.add(record.value()); - } - + protected void verifyData(List<String> results) { List<String> expectedData = new ArrayList<>(); testRows.forEach(row -> expectedData.add(row.toString())); diff --git a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java new file mode 100644 index 0000000..66f53c4 --- /dev/null +++ b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/utils/NetUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.singletenant.flink.utils; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; + +public class NetUtils { + /** + * Looking for an unused local port starting from given port. + * If the given port is less than or equal to 0, then start from 1024. + * + * @param start the given start port + * @return an unused local port, -1 if not found + */ + public static int getUnusedLocalPort(int start) { + for (int port = Math.max(start, 1024); port <= 65535; port++) { + if (!isLocalPortInUse(port)) { + return port; + } + } + return -1; + } + + /** + * Check whether the given port is in use at local. + * + * @param port port to be checked + * @return true if in use, otherwise false + */ + public static boolean isLocalPortInUse(int port) { + boolean flag = true; + try { + flag = isPortInUse("127.0.0.1", port); + } catch (Exception ignored) { + // ignored + } + return flag; + } + + /** + * Check whether the given port is in use online. + * + * @param host IP + * @param port port + * @return true if in use, otherwise false + * @throws UnknownHostException thrown if the given IP is unknown + */ + public static boolean isPortInUse(String host, int port) throws UnknownHostException { + InetAddress theAddress = InetAddress.getByName(host); + try { + new Socket(theAddress, port); + return true; + } catch (IOException ignored) { + // ignored + } + return false; + } +}
