This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 328c00542939dacc23fcde030eaf78b1916770e0 Author: yanghua <[email protected]> AuthorDate: Sat Oct 13 14:33:45 2018 +0800 [FLINK-9697] Remove usage of deprecated code in KafkaConsumerTestBase The method was unused and this was using code that is deprecated and removed in Kafka 2.0 --- .../connectors/kafka/KafkaConsumerTestBase.java | 46 ++-------------------- 1 file changed, 4 insertions(+), 42 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 7d88f0d..4ce8e62 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -78,12 +78,6 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; import kafka.server.KafkaServer; import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.kafka.clients.producer.ProducerConfig; @@ -184,7 +178,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { stream.print(); see.execute("No broker test"); } catch (JobExecutionException jee) { - if (kafkaServer.getVersion().equals("0.9") || kafkaServer.getVersion().equals("0.10") || kafkaServer.getVersion().equals("0.11")) { + if (kafkaServer.getVersion().equals("0.9") || + kafkaServer.getVersion().equals("0.10") || + kafkaServer.getVersion().equals("0.11") || + kafkaServer.getVersion().equals("2.0")) { assertTrue(jee.getCause() instanceof TimeoutException); TimeoutException te = (TimeoutException) jee.getCause(); @@ -2115,41 +2112,6 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase { // Debugging utilities // ------------------------------------------------------------------------ - /** - * Read topic to list, only using Kafka code. - */ - private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) { - ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config); - // we request only one stream per consumer instance. Kafka will make sure that each consumer group - // will see each message only once. - Map<String, Integer> topicCountMap = Collections.singletonMap(topicName, 1); - Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumerConnector.createMessageStreams(topicCountMap); - if (streams.size() != 1) { - throw new RuntimeException("Expected only one message stream but got " + streams.size()); - } - List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName); - if (kafkaStreams == null) { - throw new RuntimeException("Requested stream not available. Available streams: " + streams.toString()); - } - if (kafkaStreams.size() != 1) { - throw new RuntimeException("Requested 1 stream from Kafka, bot got " + kafkaStreams.size() + " streams"); - } - LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, config.groupId()); - ConsumerIterator<byte[], byte[]> iteratorToRead = kafkaStreams.get(0).iterator(); - - List<MessageAndMetadata<byte[], byte[]>> result = new ArrayList<>(); - int read = 0; - while (iteratorToRead.hasNext()) { - read++; - result.add(iteratorToRead.next()); - if (read == stopAfter) { - LOG.info("Read " + read + " elements"); - return result; - } - } - return result; - } - private static class BrokerKillingMapper<T> extends RichMapFunction<T, T> implements ListCheckpointed<Integer>, CheckpointListener {
