[ 
https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14357366#comment-14357366
 ] 

Guozhang Wang commented on KAFKA-1910:
--------------------------------------

Got some problems with RB, uploading the patch here for a quick review:

{code}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index e972efb..436f9b2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -129,7 +129,7 @@ public final class Coordinator {
 
         // process the response
         JoinGroupResponse response = new 
JoinGroupResponse(resp.responseBody());
-        // TODO: needs to handle disconnects and errors
+        // TODO: needs to handle disconnects and errors, should not just throw 
exceptions
         Errors.forCode(response.errorCode()).maybeThrow();
         this.consumerId = response.consumerId();
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 27c78b8..8b71fba 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -231,11 +231,12 @@ public class Fetcher<K, V> {
                         log.debug("Fetched offset {} for partition {}", 
offset, topicPartition);
                         return offset;
                     } else if (errorCode == 
Errors.NOT_LEADER_FOR_PARTITION.code()
-                        || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) {
+                        || errorCode == 
Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
                         log.warn("Attempt to fetch offsets for partition {} 
failed due to obsolete leadership information, retrying.",
                             topicPartition);
                         awaitMetadataUpdate();
                     } else {
+                        // TODO: we should not just throw exceptions but 
should handle and log it.
                         Errors.forCode(errorCode).maybeThrow();
                     }
                 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index af704f3..f706086 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -45,7 +45,9 @@ public class ListOffsetResponse extends 
AbstractRequestResponse {
     /**
      * Possible error code:
      *
-     * TODO
+     *  UNKNOWN_TOPIC_OR_PARTITION (3)
+     *  NOT_LEADER_FOR_PARTITION (6)
+     *  UNKNOWN (-1)
      */
 
     private static final String OFFSETS_KEY_NAME = "offsets";
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index fed37e3..8eae1ab 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -260,8 +260,10 @@ class ConsumerTest extends IntegrationTestHarness with 
Logging {
     var iter: Int = 0
 
     override def doWork(): Unit = {
-      killRandomBroker()
+      info("Killed broker %d".format(killRandomBroker()))
+      Thread.sleep(500)
       restartDeadBrokers()
+      info("Restarted all brokers")
 
       iter += 1
       if (iter == numIters)
{code}

> Refactor KafkaConsumer
> ----------------------
>
>                 Key: KAFKA-1910
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1910
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>         Attachments: KAFKA-1910.patch, KAFKA-1910_2015-03-05_14:55:33.patch
>
>
> KafkaConsumer now contains all the logic on the consumer side, making it a 
> very huge class file, better re-factoring it to have multiple layers on top 
> of KafkaClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to