[ 
https://issues.apache.org/jira/browse/KAFKA-5238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16247762#comment-16247762
 ] 

Edoardo Comar commented on KAFKA-5238:
--------------------------------------

Hi [~ijuma] we may come up with a PR on this issue which is hitting us in a 
slightly different scenario,
but with the same root cause :
topic being deleted just after creation, which means there is a follower first 
fetch - delayed fetch request - still in the purgatory
while in your test case there is a delayed fetch because the consumer has 
issued a prefetch.

our fix consists in checking in
{{ReplicaManager.readFromLocalLog}}
{{if(allPartitions.contains(tp))}}
and if not throw an UnknownTopicOrPartitionException
which will be checked in {{KafkaApi.fetchResponseCallback}} as a guard on 
invoking {{brokerTopicStats.updateBytesOut}}

does this approach makes sense?
Hope so, in that case will make the PR on Monday






> BrokerTopicMetrics can be recreated after topic is deleted
> ----------------------------------------------------------
>
>                 Key: KAFKA-5238
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5238
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Ismael Juma
>            Assignee: Edoardo Comar
>
> As part of KAFKA-3258, we added code to remove metrics during topic deletion. 
> This works fine as long as there are no fetch requests in the purgatory. If 
> there are, however, we'll recreate the metrics when we call 
> `ReplicaManager.appendToLocalLog`.
> This can be reproduced by updating 
> MetricsTest.testBrokerTopicMetricsUnregisteredAfterDeletingTopic() in the 
> following way:
> {code}
> @Test
>   def testBrokerTopicMetricsUnregisteredAfterDeletingTopic() {
>     val topic = "test-broker-topic-metric"
>     AdminUtils.createTopic(zkUtils, topic, 2, 1)
>     // Produce a few messages and consume them to create the metrics
>     TestUtils.produceMessages(servers, topic, nMessages)
>     TestUtils.consumeTopicRecords(servers, topic, nMessages)
>     assertTrue("Topic metrics don't exist", topicMetricGroups(topic).nonEmpty)
>     assertNotNull(BrokerTopicStats.getBrokerTopicStats(topic))
>     AdminUtils.deleteTopic(zkUtils, topic)
>     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
>     Thread.sleep(10000)
>     assertEquals("Topic metrics exists after deleteTopic", Set.empty, 
> topicMetricGroups(topic))
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to