This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 62c001c  DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697 
(#2456)
62c001c is described below

commit 62c001c71822a77512da14de033759872647ed65
Author: Rymar Maksym <[email protected]>
AuthorDate: Wed Feb 16 08:33:29 2022 +0200

    DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697 (#2456)
---
 .../drill/exec/store/kafka/KafkaGroupScan.java     | 36 ++++++++++++++++++++--
 .../drill/exec/store/kafka/MessageIterator.java    |  3 +-
 .../drill/exec/store/kafka/KafkaQueriesTest.java   | 28 ++++++++++++++---
 3 files changed, 59 insertions(+), 8 deletions(-)

diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index 58c2f9f..e025e65 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import java.time.Duration;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -26,6 +27,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -44,6 +46,7 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.CompleteWork;
 import org.apache.drill.exec.store.schedule.EndpointByteMap;
 import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -176,14 +179,13 @@ public class KafkaGroupScan extends AbstractGroupScan {
             .message("Table '%s' does not exist", topicName)
             .build(logger);
       }
-
       kafkaConsumer.subscribe(Collections.singletonList(topicName));
       // based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
       // evaluates lazily, seeking to the first/last offset in all partitions 
only
       // when poll(long) or
       // position(TopicPartition) are called
-      kafkaConsumer.poll(0);
-      Set<TopicPartition> assignments = kafkaConsumer.assignment();
+      kafkaConsumer.poll(Duration.ofSeconds(5));
+      Set<TopicPartition> assignments = 
waitForConsumerAssignment(kafkaConsumer);
       topicPartitions = kafkaConsumer.partitionsFor(topicName);
 
       // fetch start offsets for each topicPartition
@@ -227,6 +229,34 @@ public class KafkaGroupScan extends AbstractGroupScan {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to KIP-505.
+   * It can be replaced with Kafka implementation once it will be introduced.
+   * @param consumer Kafka consumer whom need to get assignments
+   * @return
+   * @throws InterruptedException
+   */
+  private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer) 
throws InterruptedException {
+    Set<TopicPartition> assignments = consumer.assignment();
+
+    long waitingForAssigmentTimeout = 
kafkaStoragePlugin.getContext().getOptionManager().getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+    long timeout = 0;
+
+    while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+      Thread.sleep(500);
+      timeout += 500;
+      assignments = consumer.assignment();
+    }
+
+    if (timeout >= waitingForAssigmentTimeout) {
+      throw UserException.dataReadError()
+        .message("Consumer assignment wasn't completed within the timeout %s", 
waitingForAssigmentTimeout)
+        .build(logger);
+    }
+
+    return assignments;
+  }
+
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
     assignments = AssignmentCreator.getMappings(incomingEndpoints, 
Lists.newArrayList(partitionWorkMap.values()));
diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
index 68855ce..9827298 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.kafka;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -79,7 +80,7 @@ public class MessageIterator implements 
Iterator<ConsumerRecord<byte[], byte[]>>
     ConsumerRecords<byte[], byte[]> consumerRecords;
     Stopwatch stopwatch = logger.isDebugEnabled() ? Stopwatch.createStarted() 
: null;
     try {
-      consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut);
+      consumerRecords = 
kafkaConsumer.poll(Duration.ofMillis(kafkaPollTimeOut));
     } catch (KafkaException ke) {
       throw 
UserException.dataReadError(ke).message(ke.getMessage()).build(logger);
     } finally {
diff --git 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index e04012c..e86423e 100644
--- 
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ 
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runners.MethodSorters;
 
+import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -120,7 +121,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
     queryBuilder().sql(query).run();
   }
 
-  private Map<TopicPartition, Long> fetchOffsets(int flag) {
+  private Map<TopicPartition, Long> fetchOffsets(int flag) throws 
InterruptedException {
     Consumer<byte[], byte[]> kafkaConsumer = null;
     try {
       kafkaConsumer = new 
KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
@@ -132,8 +133,8 @@ public class KafkaQueriesTest extends KafkaTestBase {
       // evaluates lazily, seeking to the
       // first/last offset in all partitions only when poll(long) or
       // position(TopicPartition) are called
-      kafkaConsumer.poll(0);
-      Set<TopicPartition> assignments = kafkaConsumer.assignment();
+      kafkaConsumer.poll(Duration.ofSeconds(5));
+      Set<TopicPartition> assignments = 
waitForConsumerAssignment(kafkaConsumer);
 
       if (flag == -2) {
         // fetch start offsets for each topicPartition
@@ -156,6 +157,25 @@ public class KafkaQueriesTest extends KafkaTestBase {
     }
   }
 
+  private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer) 
throws InterruptedException {
+    Set<TopicPartition> assignments = consumer.assignment();
+
+    long waitingForAssigmentTimeout = 5000;
+    long timeout = 0;
+
+    while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+      Thread.sleep(500);
+      timeout += 500;
+      assignments = consumer.assignment();
+    }
+
+    if (timeout >= waitingForAssigmentTimeout) {
+      fail("Consumer assignment wasn't completed within the timeout " + 
waitingForAssigmentTimeout);
+    }
+
+    return assignments;
+  }
+
   @Test
   public void testPhysicalPlanSubmission() throws Exception {
     String query = String.format(TestQueryConstants.MSG_SELECT_QUERY, 
TestQueryConstants.JSON_TOPIC);
@@ -281,4 +301,4 @@ public class KafkaQueriesTest extends KafkaTestBase {
       client.resetSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
     }
   }
-}
\ No newline at end of file
+}

Reply via email to