This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8eaee0ffe47b21d01f5c013d0bf9beec98a33821 Author: Fabian Paul <[email protected]> AuthorDate: Thu Sep 30 14:31:17 2021 +0200 [FLINK-24405][tests] Introduce util to reliably drain all messages from a kafka topic The approach used before the introduction was to poll the topic with a given timeout and if the poll did not return a record treat it as all records have been read. Unfortunately, due to network issues or similar the poll can easily return without any records but end offset was not reached making the tests unreliable. The new util always drains all messages until the end offset is reached. --- .../connector/kafka/sink/KafkaSinkITCase.java | 34 ++---- .../connector/kafka/sink/KafkaTransactionLog.java | 60 +-------- .../flink/connector/kafka/sink/KafkaUtil.java | 135 +++++++++++++++++++++ .../connector/kafka/sink/KafkaWriterITCase.java | 8 +- 4 files changed, 154 insertions(+), 83 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index 6fc7511..965eea0 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -59,7 +59,6 @@ import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.junit.After; @@ -83,7 +82,6 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.time.Duration; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -116,8 +114,6 @@ public class KafkaSinkITCase extends TestLogger { private static final Network NETWORK = Network.newNetwork(); private static final int ZK_TIMEOUT_MILLIS = 30000; private static final short TOPIC_REPLICATION_FACTOR = 1; - private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1); - private static final RecordSerializer serializer = new RecordSerializer(); private static AdminClient admin; private String topic; @@ -255,7 +251,7 @@ public class KafkaSinkITCase extends TestLogger { executeWithMapper( new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, "newPrefix"); final List<ConsumerRecord<byte[], byte[]>> collectedRecords = - drainAllRecordsFromTopic(topic); + drainAllRecordsFromTopic(topic, true); assertThat( deserializeValues(collectedRecords), contains( @@ -276,7 +272,7 @@ public class KafkaSinkITCase extends TestLogger { e.getCause().getCause().getMessage(), containsString("Exceeded checkpoint tolerable failure")); } - assertTrue(deserializeValues(drainAllRecordsFromTopic(topic)).isEmpty()); + assertTrue(deserializeValues(drainAllRecordsFromTopic(topic, true)).isEmpty()); // Second job aborts all transactions from previous runs with higher parallelism config.set(CoreOptions.DEFAULT_PARALLELISM, 1); @@ -284,7 +280,7 @@ public class KafkaSinkITCase extends TestLogger { executeWithMapper( new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, null); final List<ConsumerRecord<byte[], byte[]>> collectedRecords = - drainAllRecordsFromTopic(topic); + drainAllRecordsFromTopic(topic, true); assertThat( deserializeValues(collectedRecords), contains( @@ -350,7 +346,7 @@ public class KafkaSinkITCase extends TestLogger { env.execute(); final List<ConsumerRecord<byte[], byte[]>> collectedRecords = - drainAllRecordsFromTopic(topic); + drainAllRecordsFromTopic(topic, guarantee == DeliveryGuarantee.EXACTLY_ONCE); recordsAssertion.accept(deserializeValues(collectedRecords)); checkProducerLeak(); } @@ -380,7 +376,8 @@ public class KafkaSinkITCase extends TestLogger { env.execute(); final List<ConsumerRecord<byte[], byte[]>> collectedRecords = - drainAllRecordsFromTopic(topic); + drainAllRecordsFromTopic( + topic, deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE); final long recordsCount = expectedRecords.get().get(); assertEquals(collectedRecords.size(), recordsCount); assertThat( @@ -441,23 +438,10 @@ public class KafkaSinkITCase extends TestLogger { result.all().get(1, TimeUnit.MINUTES); } - private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String topic) { + private List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic( + String topic, boolean committed) { Properties properties = getKafkaClientConfiguration(); - return drainAllRecordsFromTopic(topic, properties); - } - - static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic( - String topic, Properties properties) { - final List<ConsumerRecord<byte[], byte[]>> collectedRecords = new ArrayList<>(); - try (Consumer<byte[], byte[]> consumer = createTestConsumer(topic, properties)) { - ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_DURATION); - // Drain the kafka topic till all records are consumed - while (!records.isEmpty()) { - records.records(topic).forEach(collectedRecords::add); - records = consumer.poll(CONSUMER_POLL_DURATION); - } - } - return collectedRecords; + return KafkaUtil.drainAllRecordsFromTopic(topic, properties, committed); } private static class RecordSerializer implements SerializationSchema<Long> { diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java index 0526120..c6f715f 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java @@ -18,28 +18,22 @@ package org.apache.flink.connector.kafka.sink; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; -import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.apache.flink.connector.kafka.sink.KafkaUtil.drainAllRecordsFromTopic; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME; @@ -49,7 +43,6 @@ import static org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NA */ class KafkaTransactionLog { - private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1); private static final int SUPPORTED_KAFKA_SCHEMA_VERSION = 0; private final Properties consumerConfig; @@ -74,52 +67,11 @@ class KafkaTransactionLog { /** Gets all {@link TransactionRecord} matching the given id filter. */ public List<TransactionRecord> getTransactions(Predicate<String> transactionIdFilter) throws KafkaException { - try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig)) { - Set<TopicPartition> assignments = getAllPartitions(consumer); - Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments); - final Map<Integer, Long> endOffsetByPartition = - endOffsets.entrySet().stream() - .collect( - Collectors.toMap( - e -> e.getKey().partition(), Map.Entry::getValue)); - consumer.beginningOffsets(assignments) - .forEach( - (tp, offset) -> { - if (endOffsets.get(tp) <= offset) { - assignments.remove(tp); - } - }); - consumer.assign(assignments); - consumer.seekToBeginning(assignments); - - final List<TransactionRecord> transactionRecords = new ArrayList<>(); - while (!assignments.isEmpty()) { - ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_DURATION); - boolean finishedPartition = false; - for (ConsumerRecord<byte[], byte[]> r : records) { - long remainingRecords = endOffsetByPartition.get(r.partition()) - r.offset(); - if (remainingRecords >= 1) { - parseTransaction(r, transactionIdFilter).ifPresent(transactionRecords::add); - } - if (remainingRecords <= 1) { - assignments.remove(new TopicPartition(r.topic(), r.partition())); - finishedPartition = true; - } - } - if (finishedPartition) { - consumer.assign(assignments); - } - } - return transactionRecords; - } - } - - private Set<TopicPartition> getAllPartitions(KafkaConsumer<byte[], byte[]> consumer) { - final List<PartitionInfo> partitionInfos = - consumer.partitionsFor(TRANSACTION_STATE_TOPIC_NAME); - return partitionInfos.stream() - .map(info -> new TopicPartition(info.topic(), info.partition())) - .collect(Collectors.toCollection(HashSet::new)); + return drainAllRecordsFromTopic(TRANSACTION_STATE_TOPIC_NAME, consumerConfig, true).stream() + .map(r -> parseTransaction(r, transactionIdFilter)) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.toList()); } private Optional<TransactionRecord> parseTransaction( diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java new file mode 100644 index 0000000..d460919 --- /dev/null +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.sink; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Collection of methods to interact with a Kafka cluster. */ +public class KafkaUtil { + + private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class); + private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1); + + private KafkaUtil() {} + + /** + * Drain all records available from the given topic from the beginning until the current highest + * offset. + * + * <p>This method will fetch the latest offsets for the partitions once and only return records + * until that point. + * + * @param topic to fetch from + * @param properties used to configure the created {@link KafkaConsumer} + * @param committed determines the mode {@link ConsumerConfig#ISOLATION_LEVEL_CONFIG} with which + * the consumer reads the records. + * @return all {@link ConsumerRecord} in the topic + * @throws KafkaException + */ + public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic( + String topic, Properties properties, boolean committed) throws KafkaException { + final Properties consumerConfig = new Properties(); + consumerConfig.putAll(properties); + consumerConfig.put( + ConsumerConfig.ISOLATION_LEVEL_CONFIG, + committed ? "read_committed" : "read_uncommitted"); + return drainAllRecordsFromTopic(topic, consumerConfig); + } + + /** + * Drain all records available from the given topic from the beginning until the current highest + * offset. + * + * <p>This method will fetch the latest offsets for the partitions once and only return records + * until that point. + * + * @param topic to fetch from + * @param properties used to configure the created {@link KafkaConsumer} + * @return all {@link ConsumerRecord} in the topic + * @throws KafkaException + */ + public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic( + String topic, Properties properties) throws KafkaException { + final Properties consumerConfig = new Properties(); + consumerConfig.putAll(properties); + consumerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName()); + consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName()); + try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerConfig)) { + Map<Integer, TopicPartition> assignments = getAllPartitions(consumer, topic); + Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignments.values()); + consumer.assign(assignments.values()); + consumer.seekToBeginning(assignments.values()); + + final List<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>(); + while (!assignments.isEmpty()) { + consumer.assign(assignments.values()); + ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_DURATION); + LOG.info("Fetched {} records from topic {}.", records.count(), topic); + + // Remove partitions from polling which have reached its end. + final Iterator<Map.Entry<Integer, TopicPartition>> assignmentIterator = + assignments.entrySet().iterator(); + while (assignmentIterator.hasNext()) { + final Map.Entry<Integer, TopicPartition> assignment = assignmentIterator.next(); + final TopicPartition topicPartition = assignment.getValue(); + final long position = consumer.position(topicPartition); + final long endOffset = endOffsets.get(topicPartition); + LOG.info( + "Endoffset {} and current position {} for partition {}", + endOffset, + position, + assignment.getKey()); + if (endOffset - position > 0) { + continue; + } + assignmentIterator.remove(); + } + for (ConsumerRecord<byte[], byte[]> r : records) { + consumerRecords.add(r); + } + } + return consumerRecords; + } + } + + private static Map<Integer, TopicPartition> getAllPartitions( + KafkaConsumer<byte[], byte[]> consumer, String topic) { + final List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic); + return partitionInfos.stream() + .map(info -> new TopicPartition(info.topic(), info.partition())) + .collect(Collectors.toMap(TopicPartition::partition, Function.identity())); + } +} diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index a1be041..2ac3f62 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -66,7 +66,7 @@ import java.util.PriorityQueue; import java.util.Properties; import java.util.stream.IntStream; -import static org.apache.flink.connector.kafka.sink.KafkaSinkITCase.drainAllRecordsFromTopic; +import static org.apache.flink.connector.kafka.sink.KafkaUtil.drainAllRecordsFromTopic; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -212,7 +212,7 @@ public class KafkaWriterITCase extends TestLogger { committables.get(0).getProducer().get().getObject().commitTransaction(); List<ConsumerRecord<byte[], byte[]>> records = - KafkaSinkITCase.drainAllRecordsFromTopic(topic, getKafkaClientConfiguration()); + drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true); assertThat(records, hasSize(1)); } @@ -291,7 +291,7 @@ public class KafkaWriterITCase extends TestLogger { try (final KafkaWriter<Integer> writer = createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) { writer.write(1, SINK_WRITER_CONTEXT); - assertThat(drainAllRecordsFromTopic(topic, properties), hasSize(0)); + assertThat(drainAllRecordsFromTopic(topic, properties, true), hasSize(0)); } try (final KafkaWriter<Integer> writer = @@ -310,7 +310,7 @@ public class KafkaWriterITCase extends TestLogger { producer.commitTransaction(); } - assertThat(drainAllRecordsFromTopic(topic, properties), hasSize(1)); + assertThat(drainAllRecordsFromTopic(topic, properties, true), hasSize(1)); } }
