This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 62c001c DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697
(#2456)
62c001c is described below
commit 62c001c71822a77512da14de033759872647ed65
Author: Rymar Maksym <[email protected]>
AuthorDate: Wed Feb 16 08:33:29 2022 +0200
DRILL-8122: Change kafka metadata obtaining due to KAFKA-5697 (#2456)
---
.../drill/exec/store/kafka/KafkaGroupScan.java | 36 ++++++++++++++++++++--
.../drill/exec/store/kafka/MessageIterator.java | 3 +-
.../drill/exec/store/kafka/KafkaQueriesTest.java | 28 ++++++++++++++---
3 files changed, 59 insertions(+), 8 deletions(-)
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index 58c2f9f..e025e65 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.kafka;
+import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -26,6 +27,7 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -44,6 +46,7 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.CompleteWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -176,14 +179,13 @@ public class KafkaGroupScan extends AbstractGroupScan {
.message("Table '%s' does not exist", topicName)
.build(logger);
}
-
kafkaConsumer.subscribe(Collections.singletonList(topicName));
// based on KafkaConsumer JavaDoc, seekToBeginning/seekToEnd functions
// evaluates lazily, seeking to the first/last offset in all partitions
only
// when poll(long) or
// position(TopicPartition) are called
- kafkaConsumer.poll(0);
- Set<TopicPartition> assignments = kafkaConsumer.assignment();
+ kafkaConsumer.poll(Duration.ofSeconds(5));
+ Set<TopicPartition> assignments =
waitForConsumerAssignment(kafkaConsumer);
topicPartitions = kafkaConsumer.partitionsFor(topicName);
// fetch start offsets for each topicPartition
@@ -227,6 +229,34 @@ public class KafkaGroupScan extends AbstractGroupScan {
}
}
+
+ /** Workaround for Kafka > 2.0 version due to KIP-505.
+ * It can be replaced with Kafka implementation once it will be introduced.
+ * @param consumer Kafka consumer whom need to get assignments
+ * @return
+ * @throws InterruptedException
+ */
+ private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer)
throws InterruptedException {
+ Set<TopicPartition> assignments = consumer.assignment();
+
+ long waitingForAssigmentTimeout =
kafkaStoragePlugin.getContext().getOptionManager().getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+ long timeout = 0;
+
+ while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+ Thread.sleep(500);
+ timeout += 500;
+ assignments = consumer.assignment();
+ }
+
+ if (timeout >= waitingForAssigmentTimeout) {
+ throw UserException.dataReadError()
+ .message("Consumer assignment wasn't completed within the timeout %s",
waitingForAssigmentTimeout)
+ .build(logger);
+ }
+
+ return assignments;
+ }
+
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
assignments = AssignmentCreator.getMappings(incomingEndpoints,
Lists.newArrayList(partitionWorkMap.values()));
diff --git
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
index 68855ce..9827298 100644
---
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
+++
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.kafka;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -79,7 +80,7 @@ public class MessageIterator implements
Iterator<ConsumerRecord<byte[], byte[]>>
ConsumerRecords<byte[], byte[]> consumerRecords;
Stopwatch stopwatch = logger.isDebugEnabled() ? Stopwatch.createStarted()
: null;
try {
- consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut);
+ consumerRecords =
kafkaConsumer.poll(Duration.ofMillis(kafkaPollTimeOut));
} catch (KafkaException ke) {
throw
UserException.dataReadError(ke).message(ke.getMessage()).build(logger);
} finally {
diff --git
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index e04012c..e86423e 100644
---
a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++
b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runners.MethodSorters;
+import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -120,7 +121,7 @@ public class KafkaQueriesTest extends KafkaTestBase {
queryBuilder().sql(query).run();
}
- private Map<TopicPartition, Long> fetchOffsets(int flag) {
+ private Map<TopicPartition, Long> fetchOffsets(int flag) throws
InterruptedException {
Consumer<byte[], byte[]> kafkaConsumer = null;
try {
kafkaConsumer = new
KafkaConsumer<>(storagePluginConfig.getKafkaConsumerProps(),
@@ -132,8 +133,8 @@ public class KafkaQueriesTest extends KafkaTestBase {
// evaluates lazily, seeking to the
// first/last offset in all partitions only when poll(long) or
// position(TopicPartition) are called
- kafkaConsumer.poll(0);
- Set<TopicPartition> assignments = kafkaConsumer.assignment();
+ kafkaConsumer.poll(Duration.ofSeconds(5));
+ Set<TopicPartition> assignments =
waitForConsumerAssignment(kafkaConsumer);
if (flag == -2) {
// fetch start offsets for each topicPartition
@@ -156,6 +157,25 @@ public class KafkaQueriesTest extends KafkaTestBase {
}
}
+ private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer)
throws InterruptedException {
+ Set<TopicPartition> assignments = consumer.assignment();
+
+ long waitingForAssigmentTimeout = 5000;
+ long timeout = 0;
+
+ while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+ Thread.sleep(500);
+ timeout += 500;
+ assignments = consumer.assignment();
+ }
+
+ if (timeout >= waitingForAssigmentTimeout) {
+ fail("Consumer assignment wasn't completed within the timeout " +
waitingForAssigmentTimeout);
+ }
+
+ return assignments;
+ }
+
@Test
public void testPhysicalPlanSubmission() throws Exception {
String query = String.format(TestQueryConstants.MSG_SELECT_QUERY,
TestQueryConstants.JSON_TOPIC);
@@ -281,4 +301,4 @@ public class KafkaQueriesTest extends KafkaTestBase {
client.resetSession(ExecConstants.KAFKA_READER_ESCAPE_ANY_CHAR);
}
}
-}
\ No newline at end of file
+}