This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit e8638b614c438e879435819d1a993ec65e39fd44 Author: Fabian Paul <[email protected]> AuthorDate: Fri Oct 1 15:06:09 2021 +0200 [FLINK-24405][tests] Harden kafka tests based on KafkaTestBase --- .../flink/connector/kafka/sink/KafkaUtil.java | 40 +++++++++---------- .../connectors/kafka/FlinkKafkaProducerITCase.java | 19 +++++---- .../connectors/kafka/KafkaMigrationTestBase.java | 2 +- .../connectors/kafka/KafkaProducerTestBase.java | 3 +- .../streaming/connectors/kafka/KafkaTestBase.java | 45 ++++++---------------- .../connectors/kafka/KafkaTestEnvironment.java | 2 +- .../connectors/kafka/KafkaTestEnvironmentImpl.java | 36 +++-------------- .../kafka/shuffle/KafkaShuffleITCase.java | 21 ++++++---- 8 files changed, 62 insertions(+), 106 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java index d460919..5aa5a67 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java @@ -22,7 +22,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; @@ -30,11 +29,10 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.function.Function; +import java.util.Set; import java.util.stream.Collectors; /** Collection of methods to interact with a Kafka cluster. */ @@ -88,34 +86,33 @@ public class KafkaUtil { consumerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName()); consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName()); try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig)) { - Map<Integer, TopicPartition> assignments = getAllPartitions(consumer, topic); - Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments.values()); - consumer.assign(assignments.values()); - consumer.seekToBeginning(assignments.values()); + Set<TopicPartition> topicPartitions = getAllPartitions(consumer, topic); + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions); + consumer.assign(topicPartitions); + consumer.seekToBeginning(topicPartitions); final List<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>(); - while (!assignments.isEmpty()) { - consumer.assign(assignments.values()); + while (!topicPartitions.isEmpty()) { ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_DURATION); - LOG.info("Fetched {} records from topic {}.", records.count(), topic); + LOG.debug("Fetched {} records from topic {}.", records.count(), topic); // Remove partitions from polling which have reached its end. - final Iterator<Map.Entry<Integer, TopicPartition>> assignmentIterator = - assignments.entrySet().iterator(); - while (assignmentIterator.hasNext()) { - final Map.Entry<Integer, TopicPartition> assignment = assignmentIterator.next(); - final TopicPartition topicPartition = assignment.getValue(); + final List<TopicPartition> finishedPartitions = new ArrayList<>(); + for (final TopicPartition topicPartition : topicPartitions) { final long position = consumer.position(topicPartition); final long endOffset = endOffsets.get(topicPartition); - LOG.info( + LOG.debug( "Endoffset {} and current position {} for partition {}", endOffset, position, - assignment.getKey()); + topicPartition.partition()); if (endOffset - position > 0) { continue; } - assignmentIterator.remove(); + finishedPartitions.add(topicPartition); + } + if (topicPartitions.removeAll(finishedPartitions)) { + consumer.assign(topicPartitions); } for (ConsumerRecord<byte[], byte[]> r : records) { consumerRecords.add(r); @@ -125,11 +122,10 @@ public class KafkaUtil { } } - private static Map<Integer, TopicPartition> getAllPartitions( + private static Set<TopicPartition> getAllPartitions( KafkaConsumer<byte[], byte[]> consumer, String topic) { - final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); - return partitionInfos.stream() + return consumer.partitionsFor(topic).stream() .map(info -> new TopicPartition(info.topic(), info.partition())) - .collect(Collectors.toMap(TopicPartition::partition, Function.identity())); + .collect(Collectors.toSet()); } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java index 022fbe2..3194d49 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerITCase.java @@ -156,7 +156,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { testHarness2.open(); } - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42)); deleteTestTopic(topic); } catch (Exception ex) { // testHarness1 will be fenced off after creating and closing testHarness2 @@ -202,7 +202,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { testHarness.initializeState(snapshot); testHarness.close(); - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 43)); deleteTestTopic(topic); checkProducerLeak(); @@ -250,7 +250,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { // - aborted transactions with records 44 and 45 // - committed transaction with record 46 // - pending transaction with record 47 - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 46)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 43, 46)); try { testHarness1.close(); @@ -313,7 +313,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { // now we should have: // - records 42 and 43 in committed transactions // - aborted transactions with records 44 and 45 - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 43)); deleteTestTopic(topic); checkProducerLeak(); } @@ -369,7 +369,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { // - records 42, 43, 44 and 45 in aborted transactions // - committed transaction with record 46 // - pending transaction with record 47 - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(46)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(46)); postScaleDownOperator1.close(); // ignore ProducerFencedExceptions, because postScaleDownOperator1 could reuse transactional @@ -454,7 +454,6 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { assertExactlyOnceForTopic( createProperties(), topic, - 0, IntStream.range(0, parallelism1 + parallelism2 + parallelism3) .boxed() .collect(Collectors.toList())); @@ -548,7 +547,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { checkpoint0); // recover state 0 - producerA recover and commit txn 0 testHarness.close(); - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42)); deleteTestTopic(topic); checkProducerLeak(); @@ -584,7 +583,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { topic, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 44, 45)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 43, 44, 45)); deleteTestTopic(topic); } @@ -595,7 +594,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { topic, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 43, 45, 46, 47)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 43, 45, 46, 47)); deleteTestTopic(topic); } @@ -717,7 +716,7 @@ public class FlinkKafkaProducerITCase extends KafkaTestBase { testHarness2.processElement(46, 6); } - assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(42, 44)); + assertExactlyOnceForTopic(createProperties(), topic, Arrays.asList(42, 44)); checkProducerLeak(); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java index aa52b95..9549b3d 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaMigrationTestBase.java @@ -151,7 +151,7 @@ public abstract class KafkaMigrationTestBase extends KafkaTestBase { // - transaction 43 aborted // - committed transaction 44 // - transaction 45 pending - assertExactlyOnceForTopic(createProperties(), TOPIC, 0, Arrays.asList(42, 44)); + assertExactlyOnceForTopic(createProperties(), TOPIC, Arrays.asList(42, 44)); } } finally { shutdownClusters(); diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index 2267cc3..155ea7e 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -418,8 +418,7 @@ public abstract class KafkaProducerTestBase extends KafkaTestBaseWithFlink { for (int i = 0; i < sinksCount; i++) { // assert that before failure we successfully snapshot/flushed all expected elements - assertExactlyOnceForTopic( - properties, topic + i, partition, expectedElements, KAFKA_READ_TIMEOUT); + assertExactlyOnceForTopic(properties, topic + i, expectedElements); deleteTestTopic(topic + i); } } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java index da41553..9a38990 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java @@ -42,6 +42,7 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -280,7 +281,7 @@ public abstract class KafkaTestBase extends TestLogger { // query kafka for new records ... Collection<ConsumerRecord<Integer, Integer>> records = - kafkaServer.getAllRecordsFromTopic(properties, topic, partition, 100); + kafkaServer.getAllRecordsFromTopic(properties, topic); for (ConsumerRecord<Integer, Integer> record : records) { actualElements.add(record.value()); @@ -299,23 +300,8 @@ public abstract class KafkaTestBase extends TestLogger { } public void assertExactlyOnceForTopic( - Properties properties, String topic, int partition, List<Integer> expectedElements) { - assertExactlyOnceForTopic(properties, topic, partition, expectedElements, 30_000L); - } - - /** - * We manually handle the timeout instead of using JUnit's timeout to return failure instead of - * timeout error. After timeout we assume that there are missing records and there is a bug, not - * that the test has run out of time. - */ - public void assertExactlyOnceForTopic( - Properties properties, - String topic, - int partition, - List<Integer> expectedElements, - long timeoutMillis) { + Properties properties, String topic, List<Integer> expectedElements) { - long startMillis = System.currentTimeMillis(); List<Integer> actualElements = new ArrayList<>(); Properties consumerProperties = new Properties(); @@ -326,24 +312,17 @@ public abstract class KafkaTestBase extends TestLogger { "value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); consumerProperties.put("isolation.level", "read_committed"); - // until we timeout... - while (System.currentTimeMillis() < startMillis + timeoutMillis) { - // query kafka for new records ... - Collection<ConsumerRecord<Integer, Integer>> records = - kafkaServer.getAllRecordsFromTopic(consumerProperties, topic, partition, 1000); + // query kafka for new records ... + Collection<ConsumerRecord<byte[], byte[]>> records = + kafkaServer.getAllRecordsFromTopic(consumerProperties, topic); - for (ConsumerRecord<Integer, Integer> record : records) { - actualElements.add(record.value()); - } + for (ConsumerRecord<byte[], byte[]> record : records) { + actualElements.add(ByteBuffer.wrap(record.value()).getInt()); + } - // succeed if we got all expectedElements - if (actualElements.equals(expectedElements)) { - return; - } - // fail early if we already have too many elements - if (actualElements.size() > expectedElements.size()) { - break; - } + // succeed if we got all expectedElements + if (actualElements.equals(expectedElements)) { + return; } fail( diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index e4e3c6b..c6fc932 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -170,7 +170,7 @@ public abstract class KafkaTestEnvironment { List<String> topics, KafkaDeserializationSchema<T> readSchema, Properties props); public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic( - Properties properties, String topic, int partition, long timeout); + Properties properties, String topic); public abstract <T> StreamSink<T> getProducerSink( String topic, diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index bdfa662..95e3241 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.kafka.sink.KafkaUtil; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; @@ -54,11 +55,9 @@ import java.io.File; import java.net.BindException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -108,11 +107,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { this.config = config; File tempDir = new File(System.getProperty("java.io.tmpdir")); - tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString())); + tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID())); assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs()); - tmpKafkaParent = - new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString())); + tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID())); assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs()); tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber()); @@ -279,32 +277,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override + @SuppressWarnings("unchecked") public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic( - Properties properties, String topic, int partition, long timeout) { - List<ConsumerRecord<K, V>> result = new ArrayList<>(); - - try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) { - consumer.assign(Arrays.asList(new TopicPartition(topic, partition))); - - while (true) { - boolean processedAtLeastOneRecord = false; - - // wait for new records with timeout and break the loop if we didn't get any - Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator(); - while (iterator.hasNext()) { - ConsumerRecord<K, V> record = iterator.next(); - result.add(record); - processedAtLeastOneRecord = true; - } - - if (!processedAtLeastOneRecord) { - break; - } - } - consumer.commitSync(); - } - - return UnmodifiableList.decorate(result); + Properties properties, String topic) { + return UnmodifiableList.decorate(KafkaUtil.drainAllRecordsFromTopic(topic, properties)); } @Override diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java index ce9249d..31b34ea 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/shuffle/KafkaShuffleITCase.java @@ -34,8 +34,8 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher import org.apache.flink.streaming.connectors.kafka.internals.KafkaShuffleFetcher.KafkaShuffleWatermark; import org.apache.flink.util.PropertiesUtil; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Rule; @@ -507,16 +507,23 @@ public class KafkaShuffleITCase extends KafkaShuffleTestBase { FlinkKafkaShuffle.writeKeyBy(input, topic, kafkaProperties, 0); env.execute("Write to " + topic); - ImmutableMap.Builder<Integer, Collection<ConsumerRecord<byte[], byte[]>>> results = - ImmutableMap.builder(); - for (int p = 0; p < numberOfPartitions; p++) { - results.put(p, kafkaServer.getAllRecordsFromTopic(kafkaProperties, topic, p, 5000)); - } + Map<Integer, Collection<ConsumerRecord<byte[], byte[]>>> results = new HashMap<>(); + + kafkaServer + .<byte[], byte[]>getAllRecordsFromTopic(kafkaProperties, topic) + .forEach( + r -> { + final int partition = r.partition(); + if (!results.containsKey(partition)) { + results.put(partition, Lists.newArrayList()); + } + results.get(partition).add(r); + }); deleteTestTopic(topic); - return results.build(); + return results; } private StreamExecutionEnvironment createEnvironment(
