luocooong commented on a change in pull request #2456:
URL: https://github.com/apache/drill/pull/2456#discussion_r803786802



##########
File path: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer";>KIP-505</a>.
+   * It can be replaced with Kafka implementation once it will be introduced.
+   * @param consumer Kafka consumer whom need to get assignments
+   * @return
+   * @throws InterruptedException
+   */
+  private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer) 
throws InterruptedException {
+    Set<TopicPartition> assignments = consumer.assignment();
+
+    long waitingForAssigmentTimeout = 
kafkaStoragePlugin.getContext().getOptionManager().getLong(ExecConstants.KAFKA_POLL_TIMEOUT);
+    long timeout = 0;
+
+    while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+      Thread.sleep(500);
+      timeout += 500;
+      assignments = consumer.assignment();
+    }
+
+    if (timeout >= waitingForAssigmentTimeout) {
+      logger.error("Consumer assignment wasn't completed within the timeout 
{}", waitingForAssigmentTimeout);
+      throw UserException.dataReadError().build(logger);

Review comment:
       ```
   throw UserException.dataReadError()
     .message("Consumer assignment wasn't completed within the timeout %s", 
waitingForAssigmentTimeout)
     .build(logger);
   ```

##########
File path: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
##########
@@ -227,6 +229,33 @@ private void init() {
     }
   }
 
+
+  /** Workaround for Kafka > 2.0 version due to <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-505%3A+Add+new+public+method+to+only+update+assignment+metadata+in+consumer";>KIP-505</a>.

Review comment:
       ```suggestion
     /**
     Workaround for Kafka > 2.0 version due to KIP-505. It can be replaced with 
Kafka implementation once it will be introduced.
   ```

##########
File path: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
##########
@@ -156,6 +157,29 @@ public void testInformationSchema() throws Exception {
     }
   }
 
+  private Set<TopicPartition> waitForConsumerAssignment(Consumer consumer)  {
+    Set<TopicPartition> assignments = consumer.assignment();
+
+    long waitingForAssigmentTimeout = 5000;
+    long timeout = 0;
+
+    while (assignments.isEmpty() && timeout < waitingForAssigmentTimeout) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();

Review comment:
       Printing stack information is not recommended.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to