This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 51a0d42  [FLINK-12030][connector/kafka] Check the topic existence 
after topic creation using KafkaConsumer.
51a0d42 is described below

commit 51a0d42ade8ee3789036ac1ee7c121133b58212a
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Tue May 19 23:44:17 2020 +0800

    [FLINK-12030][connector/kafka] Check the topic existence after topic 
creation using KafkaConsumer.
---
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 43 ++++++++++------------
 .../connectors/kafka/KafkaTestEnvironmentImpl.java | 43 ++++++++++------------
 2 files changed, 38 insertions(+), 48 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 322c3aa..9ae751b 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -322,32 +324,25 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
                // validate that the topic has been created
                final long deadline = System.nanoTime() + 30_000_000_000L;
-               do {
-                       try {
-                               if (config.isSecureMode()) {
-                                       //increase wait time since in Travis ZK 
timeout occurs frequently
-                                       int wait = zkTimeout / 100;
-                                       LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
-                                       Thread.sleep(wait);
-                               } else {
-                                       Thread.sleep(100);
+               boolean topicCreated = false;
+               Properties props = new Properties();
+               props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
getBrokerConnectionString());
+               props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+               
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+               props.putAll(getSecureProperties());
+               try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(props)) {
+                       do {
+                               topicCreated = 
!consumer.partitionsFor(topic).isEmpty();
+                               if (!topicCreated) {
+                                       Thread.sleep(10);
                                }
-                       } catch (InterruptedException e) {
-                               // restore interrupted state
-                       }
-                       // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
-                       // not always correct.
-
-                       // create a new ZK utils connection
-                       ZkUtils checkZKConn = getZkUtils();
-                       if (AdminUtils.topicExists(checkZKConn, topic)) {
-                               checkZKConn.close();
-                               return;
-                       }
-                       checkZKConn.close();
+                       } while (!topicCreated && System.nanoTime() < deadline);
+               } catch (InterruptedException e) {
+                       // do nothing.
+               }
+               if (!topicCreated) {
+                       fail("Test topic could not be created");
                }
-               while (System.nanoTime() < deadline);
-               fail("Test topic could not be created");
        }
 
        @Override
diff --git 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 478ce38..c846f16 100644
--- 
a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -36,6 +36,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.commons.collections.list.UnmodifiableList;
 import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -43,6 +44,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -157,32 +159,25 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
                // validate that the topic has been created
                final long deadline = System.nanoTime() + 30_000_000_000L;
-               do {
-                       try {
-                               if (config.isSecureMode()) {
-                                       //increase wait time since in Travis ZK 
timeout occurs frequently
-                                       int wait = zkTimeout / 100;
-                                       LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
-                                       Thread.sleep(wait);
-                               } else {
-                                       Thread.sleep(100);
+               boolean topicCreated = false;
+               Properties props = new Properties();
+               props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
getBrokerConnectionString());
+               props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+               
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+               props.putAll(getSecureProperties());
+               try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(props)) {
+                       do {
+                               topicCreated = 
!consumer.partitionsFor(topic).isEmpty();
+                               if (!topicCreated) {
+                                       Thread.sleep(10);
                                }
-                       } catch (InterruptedException e) {
-                               // restore interrupted state
-                       }
-                       // we could use AdminUtils.topicExists(zkUtils, topic) 
here, but it's results are
-                       // not always correct.
-
-                       // create a new ZK utils connection
-                       ZkUtils checkZKConn = getZkUtils();
-                       if (AdminUtils.topicExists(checkZKConn, topic)) {
-                               checkZKConn.close();
-                               return;
-                       }
-                       checkZKConn.close();
+                       } while (!topicCreated && System.nanoTime() < deadline);
+               } catch (InterruptedException e) {
+                       // do nothing.
+               }
+               if (!topicCreated) {
+                       fail("Test topic could not be created");
                }
-               while (System.nanoTime() < deadline);
-               fail("Test topic could not be created");
        }
 
        @Override

Reply via email to