[ https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14357366#comment-14357366 ]
Guozhang Wang commented on KAFKA-1910: -------------------------------------- Got some problems with RB, uploading the patch here for a quick review: {code} diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java index e972efb..436f9b2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java @@ -129,7 +129,7 @@ public final class Coordinator { // process the response JoinGroupResponse response = new JoinGroupResponse(resp.responseBody()); - // TODO: needs to handle disconnects and errors + // TODO: needs to handle disconnects and errors, should not just throw exceptions Errors.forCode(response.errorCode()).maybeThrow(); this.consumerId = response.consumerId(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 27c78b8..8b71fba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -231,11 +231,12 @@ public class Fetcher<K, V> { log.debug("Fetched offset {} for partition {}", offset, topicPartition); return offset; } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || errorCode == Errors.LEADER_NOT_AVAILABLE.code()) { + || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition); awaitMetadataUpdate(); } else { + // TODO: we should not just throw exceptions but should handle and log it. Errors.forCode(errorCode).maybeThrow(); } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index af704f3..f706086 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -45,7 +45,9 @@ public class ListOffsetResponse extends AbstractRequestResponse { /** * Possible error code: * - * TODO + * UNKNOWN_TOPIC_OR_PARTITION (3) + * NOT_LEADER_FOR_PARTITION (6) + * UNKNOWN (-1) */ private static final String OFFSETS_KEY_NAME = "offsets"; diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala index fed37e3..8eae1ab 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala @@ -260,8 +260,10 @@ class ConsumerTest extends IntegrationTestHarness with Logging { var iter: Int = 0 override def doWork(): Unit = { - killRandomBroker() + info("Killed broker %d".format(killRandomBroker())) + Thread.sleep(500) restartDeadBrokers() + info("Restarted all brokers") iter += 1 if (iter == numIters) {code} > Refactor KafkaConsumer > ---------------------- > > Key: KAFKA-1910 > URL: https://issues.apache.org/jira/browse/KAFKA-1910 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Attachments: KAFKA-1910.patch, KAFKA-1910_2015-03-05_14:55:33.patch > > > KafkaConsumer now contains all the logic on the consumer side, making it a > very huge class file, better re-factoring it to have multiple layers on top > of KafkaClient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)