This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 328c00542939dacc23fcde030eaf78b1916770e0
Author: yanghua <[email protected]>
AuthorDate: Sat Oct 13 14:33:45 2018 +0800

    [FLINK-9697] Remove usage of deprecated code in KafkaConsumerTestBase
    
    The method was unused and this was using code that is deprecated and
    removed in Kafka 2.0
---
 .../connectors/kafka/KafkaConsumerTestBase.java    | 46 ++--------------------
 1 file changed, 4 insertions(+), 42 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
index 7d88f0d..4ce8e62 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
@@ -78,12 +78,6 @@ import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
 import kafka.server.KafkaServer;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -184,7 +178,10 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
                        stream.print();
                        see.execute("No broker test");
                } catch (JobExecutionException jee) {
-                       if (kafkaServer.getVersion().equals("0.9") || 
kafkaServer.getVersion().equals("0.10") || 
kafkaServer.getVersion().equals("0.11")) {
+                       if (kafkaServer.getVersion().equals("0.9") ||
+                               kafkaServer.getVersion().equals("0.10") ||
+                               kafkaServer.getVersion().equals("0.11") ||
+                               kafkaServer.getVersion().equals("2.0")) {
                                assertTrue(jee.getCause() instanceof 
TimeoutException);
 
                                TimeoutException te = (TimeoutException) 
jee.getCause();
@@ -2115,41 +2112,6 @@ public abstract class KafkaConsumerTestBase extends 
KafkaTestBase {
        //  Debugging utilities
        // 
------------------------------------------------------------------------
 
-       /**
-        * Read topic to list, only using Kafka code.
-        */
-       private static List<MessageAndMetadata<byte[], byte[]>> 
readTopicToList(String topicName, ConsumerConfig config, final int stopAfter) {
-               ConsumerConnector consumerConnector = 
Consumer.createJavaConsumerConnector(config);
-               // we request only one stream per consumer instance. Kafka will 
make sure that each consumer group
-               // will see each message only once.
-               Map<String, Integer> topicCountMap = 
Collections.singletonMap(topicName, 1);
-               Map<String, List<KafkaStream<byte[], byte[]>>> streams = 
consumerConnector.createMessageStreams(topicCountMap);
-               if (streams.size() != 1) {
-                       throw new RuntimeException("Expected only one message 
stream but got " + streams.size());
-               }
-               List<KafkaStream<byte[], byte[]>> kafkaStreams = 
streams.get(topicName);
-               if (kafkaStreams == null) {
-                       throw new RuntimeException("Requested stream not 
available. Available streams: " + streams.toString());
-               }
-               if (kafkaStreams.size() != 1) {
-                       throw new RuntimeException("Requested 1 stream from 
Kafka, bot got " + kafkaStreams.size() + " streams");
-               }
-               LOG.info("Opening Consumer instance for topic '{}' on group 
'{}'", topicName, config.groupId());
-               ConsumerIterator<byte[], byte[]> iteratorToRead = 
kafkaStreams.get(0).iterator();
-
-               List<MessageAndMetadata<byte[], byte[]>> result = new 
ArrayList<>();
-               int read = 0;
-               while (iteratorToRead.hasNext()) {
-                       read++;
-                       result.add(iteratorToRead.next());
-                       if (read == stopAfter) {
-                               LOG.info("Read " + read + " elements");
-                               return result;
-                       }
-               }
-               return result;
-       }
-
        private static class BrokerKillingMapper<T> extends RichMapFunction<T, 
T>
                        implements ListCheckpointed<Integer>, 
CheckpointListener {
 

Reply via email to