[
https://issues.apache.org/jira/browse/KAFKA-6014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson reassigned KAFKA-6014:
--------------------------------------
Assignee: Jason Gustafson
> new consumer mirror maker halts after committing offsets to a deleted topic
> ---------------------------------------------------------------------------
>
> Key: KAFKA-6014
> URL: https://issues.apache.org/jira/browse/KAFKA-6014
> Project: Kafka
> Issue Type: Bug
> Reporter: Onur Karaman
> Assignee: Jason Gustafson
>
> New consumer throws an unexpected KafkaException when trying to commit to a
> topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to
> catch the KafkaException and just kills the process. We didn't see this in
> the old consumer because old consumer just silently drops failed offset
> commits.
> I ran a quick experiment locally to prove the behavior. The experiment:
> 1. start up a single broker
> 2. create a single-partition topic t
> 3. create a new consumer that consumes topic t
> 4. make the consumer commit every few seconds
> 5. delete topic t
> 6. expect: KafkaException that kills the process.
> Here's my script:
> {code}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Collections;
> import java.util.List;
> import java.util.Properties;
> public class OffsetCommitTopicDeletionTest {
> public static void main(String[] args) throws InterruptedException {
> Properties props = new Properties();
> props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9090");
> props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
> props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> "org.apache.kafka.common.serialization.ByteArrayDeserializer");
> props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> KafkaConsumer<byte[], byte[]> kafkaConsumer = new
> KafkaConsumer<>(props);
> TopicPartition partition = new TopicPartition("t", 0);
> List<TopicPartition> partitions =
> Collections.singletonList(partition);
> kafkaConsumer.assign(partitions);
> while (true) {
> kafkaConsumer.commitSync(Collections.singletonMap(partition, new
> OffsetAndMetadata(0, "")));
> Thread.sleep(1000);
> }
> }
> }
> {code}
> Here are the other commands:
> {code}
> > rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> > ./gradlew clean jar
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
> > export LOG_DIR=logs0 && ./bin/kafka-server-start.sh
> > config/server0.properties
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t
> > --partitions 1 --replication-factor 1
> > ./bin/kafka-run-class.sh
> > org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
> {code}
> Here is the output:
> {code}
> [2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g]
> Offset commit failed on partition t-0 at offset 0: This server does not host
> this topic-partition.
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> Exception in thread "main" org.apache.kafka.common.KafkaException: Partition
> t-0 may not exist or user may not have Describe access to topic
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> at
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231)
> at
> org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22)
> {code}
> A couple ways we could fix this:
> 1. make OffsetCommitResponseHandler throw a more specific exception and make
> MirrorMaker.commitOffsets catch the exception. It currently just catches
> WakeupException and CommitFailedException.
> 2. make OffsetCommitResponseHandler log the error and move on. This is
> probably the simpler option. Just delete lines:
> {code}
> - future.raise(new KafkaException("Partition " + tp +
> " may not exist or user may not have Describe access to topic"));
> - return;
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)