[
https://issues.apache.org/jira/browse/DRILL-8122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490328#comment-17490328
]
ASF GitHub Bot commented on DRILL-8122:
---------------------------------------
luocooong commented on a change in pull request #2456:
URL: https://github.com/apache/drill/pull/2456#discussion_r803786802
##########
File path:
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
}
}
+
+ /** Workaround for Kafka > 2.0 version due to <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.
+ * It can be replaced with Kafka implementation once it will be introduced.
+ * @param consumer Kafka consumer whom need to get assignments
+ * @return
+ * @throws InterruptedException
+ */
+ private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer)
throws InterruptedException {
+ Set<TopicPartition> assignments = consumer.assignment();
+
+ long waitingForAssigmentTimeout =
kafkaStoragePlugin.getContext().getOptionManager().getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+ long timeout = 0;
+
+ while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+ Thread.sleep(500);
+ timeout += 500;
+ assignments = consumer.assignment();
+ }
+
+ if (timeout >= waitingForAssigmentTimeout) {
+ logger.error("Consumer assignment wasn't completed within the timeout
{}", waitingForAssigmentTimeout);
+ throw UserException.dataReadError().build(logger);
Review comment:
```
throw UserException.dataReadError()
.message("Consumer assignment wasn't completed within the timeout %s",
waitingForAssigmentTimeout)
.build(logger);
```
##########
File path:
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
}
}
+
+ /** Workaround for Kafka > 2.0 version due to <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer">KIP-505</a>.
Review comment:
```suggestion
/**
Workaround for Kafka > 2.0 version due to KIP-505. It can be replaced with
Kafka implementation once it will be introduced.
```
##########
File path:
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
##########
@@ -156,6 +157,29 @@ public void testInformationSchema() throws Exception {
}
}
+ private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer) {
+ Set<TopicPartition> assignments = consumer.assignment();
+
+ long waitingForAssigmentTimeout = 5000;
+ long timeout = 0;
+
+ while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
Review comment:
Printing stack information is not recommended.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Change kafka metadata obtaining due to KAFKA-5697
> -------------------------------------------------
>
> Key: DRILL-8122
> URL: https://issues.apache.org/jira/browse/DRILL-8122
> Project: Apache Drill
> Issue Type: Bug
> Affects Versions: 1.17.0, 1.18.0, 1.19.0
> Reporter: Maksym Rymar
> Assignee: Maksym Rymar
> Priority: Major
> Fix For: 1.20.0
>
>
> Starting from kafka 2.0 was changed behavior of the
> [KafkaConsumer#poll|https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll-long-]
> . In Drill was used one hack, where poll(0) is used only to update consumer
> metadata. But after the next change:
> [https://github.com/apache/kafka/pull/4855] this hack is not working now as
> poll() is not blocked more until metadata update.
> Unfortunately, Kafka doesn't have public method to only update assignment
> metadata in consumer and it's on the discussion now whether to add it or not:
> [KIP-505|https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer]
> . Once it will be introduced we can use it but until that it needs to
> implement a workaround.
> Code to change:
> [https://github.com/apache/drill/blob/15b2f52260e4f0026f2dfafa23c5d32e0fb66502/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java#L185]
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)