[
https://issues.apache.org/jira/browse/KAFKA-14616?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Omnia Ibrahim resolved KAFKA-14616.
-----------------------------------
Fix Version/s: 3.7.0
Assignee: Colin McCabe
Resolution: Fixed
> Topic recreation with offline broker causes permanent URPs
> ----------------------------------------------------------
>
> Key: KAFKA-14616
> URL: https://issues.apache.org/jira/browse/KAFKA-14616
> Project: Kafka
> Issue Type: Bug
> Components: kraft
> Affects Versions: 3.3.1
> Reporter: Omnia Ibrahim
> Assignee: Colin McCabe
> Priority: Major
> Fix For: 3.7.0
>
>
> We are facing an odd situation when we delete and recreate a topic while
> broker is offline in KRAFT mode.
> Here’s what we saw step by step
> # Created topic {{foo.test}} with 10 partitions and 4 replicas — Topic
> {{foo.test}} was created with topic ID {{MfuZbwdmSMaiSa0g6__TPg}}
> # Took broker 4 offline — which held replicas for partitions {{0, 3, 4, 5,
> 7, 8, 9}}
> # Deleted topic {{foo.test}} — The deletion process was successful, despite
> the fact that broker 4 still held replicas for partitions {{0, 3, 4, 5, 7, 8,
> 9}} on local disk.
> # Recreated topic {{foo.test}} with 10 partitions and 4 replicas. — Topic
> {{foo.test}} was created with topic ID {{RzalpqQ9Q7ub2M2afHxY4Q}} and
> partitions {{0, 1, 2, 7, 8, 9}} got assigned to broker 4 (which was still
> offline). Notice here that partitions {{0, 7, 8, 9}} are common between the
> assignment of the deleted topic ({{{}topic_id: MfuZbwdmSMaiSa0g6__TPg{}}})
> and the recreated topic ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}}).
> # Brough broker 4 back online.
> # Broker started to create new partition replicas for the recreated topic
> {{foo.test}} ({{{}topic_id: RzalpqQ9Q7ub2M2afHxY4Q{}}})
> # The broker hit the following error {{Tried to assign topic ID
> RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition foo.test-9,but log already
> contained topic ID MfuZbwdmSMaiSa0g6__TPg}} . As a result of this error the
> broker decided to rename log dir for partitions {{0, 3, 4, 5, 7, 8, 9}} to
> {{{}<topic>-<partition>.<uuid>-delete{}}}.
> # Ran {{ls <log.dir_path>}}
> {code:java}
> foo.test-0.658f87fb9a2e42a590b5d7dcc28862b5-delete/
> foo.test-1/
> foo.test-2/
> foo.test-3.a68f05d05bcc4e579087551b539af311-delete/
> foo.test-4.79ce30a5310d4950ad1b28f226f74895-delete/
> foo.test-5.76ed04da75bf46c3a63342be1eb44450-delete/
> foo.test-6/
> foo.test-7.c2d33db3bf844e9ebbcd9ef22f5270da-delete/
> foo.test-8.33836969ac714b41b69b5334a5068ce0-delete/
> foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/{code}
> 9. Waited until the deletion of the old topic was done and ran {{ls
> <log.dir_path>}} again, now we were expecting to see log dir for partitions
> {{0, 1, 2, 7, 8, 9}} however the result is:
> {code:java}
> foo.test-1/
> foo.test-2/
> foo.test-6/{code}
> 10. Ran {{kafka-topics.sh --command-config cmd.properties
> --bootstrap-server <bootstrap> --describe --topic foo.test}}
> {code:java}
> Topic: foo.test TopicId: RzalpqQ9Q7ub2M2afHxY4Q PartitionCount: 10
> ReplicationFactor: 4 Configs:
> min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=3145728,unclean.leader.election.enable=false,retention.bytes=1000000000
> Topic: foo.test Partition: 0 Leader: 2 Replicas: 2,3,4,5 Isr: 2,3,5
> Topic: foo.test Partition: 1 Leader: 3 Replicas: 3,4,5,6 Isr: 3,5,6,4
> Topic: foo.test Partition: 2 Leader: 5 Replicas: 5,4,6,1 Isr: 5,6,1,4
> Topic: foo.test Partition: 3 Leader: 5 Replicas: 5,6,1,2 Isr: 5,6,1,2
> Topic: foo.test Partition: 4 Leader: 6 Replicas: 6,1,2,3 Isr: 6,1,2,3
> Topic: foo.test Partition: 5 Leader: 1 Replicas: 1,6,2,5 Isr: 1,6,2,5
> Topic: foo.test Partition: 6 Leader: 6 Replicas: 6,2,5,4 Isr: 6,2,5,4
> Topic: foo.test Partition: 7 Leader: 2 Replicas: 2,5,4,3 Isr: 2,5,3
> Topic: foo.test Partition: 8 Leader: 5 Replicas: 5,4,3,1 Isr: 5,3,1
> Topic: foo.test Partition: 9 Leader: 3 Replicas: 3,4,1,6 Isr: 3,1,6{code}
> Here’s a sample of broker logs
>
> {code:java}
> {"timestamp":"2023-01-11T15:19:53,620Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
> log for partition foo.test-9 in
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete.","logger":"kafka.log.LogManager"}
> {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
> time index
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/00000000000000000000.timeindex.deleted.","logger":"kafka.log.LogSegment"}
> {"timestamp":"2023-01-11T15:19:53,617Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
> offset index
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/00000000000000000000.index.deleted.","logger":"kafka.log.LogSegment"}
> {"timestamp":"2023-01-11T15:19:53,615Z","level":"INFO","thread":"kafka-scheduler-8","message":"Deleted
> log
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete/00000000000000000000.log.deleted.","logger":"kafka.log.LogSegment"}
> {"timestamp":"2023-01-11T15:19:53,614Z","level":"INFO","thread":"kafka-scheduler-8","message":"[LocalLog
> partition=foo.test-9, dir=/kafka/d1/data] Deleting segment files
> LogSegment(baseOffset=0, size=0, lastModifiedTime=1673439574661,
> largestRecordTimestamp=None)","logger":"kafka.log.LocalLog$"}
> {"timestamp":"2023-01-11T15:19:53,612Z","level":"INFO","thread":"kafka-scheduler-8","message":"[LocalLog
> partition=foo.test-9, dir=/kafka/d1/data] Deleting segments as the log has
> been deleted: LogSegment(baseOffset=0, size=0,
> lastModifiedTime=1673439574661,
> largestRecordTimestamp=None)","logger":"kafka.log.LocalLog"}
> {"timestamp":"2023-01-11T15:18:53,611Z","level":"INFO","thread":"EventHandler","message":"Log
> for partition foo.test-9 is renamed to
> /kafka/d1/data/foo.test-9.48e1494f4fac48c8aec009bf77d5e4ee-delete and is
> scheduled for deletion","logger":"kafka.log.LogManager"}
> {"timestamp":"2023-01-11T15:18:53,596Z","level":"INFO","thread":"EventHandler","message":"Found
> stray log dir Log(dir=/kafka/d1/data/foo.test-4,
> topicId=MfuZbwdmSMaiSa0g6__TPg, topic=foo.test, partition=4, highWatermark=0,
> lastStableOffset=0, logStartOffset=0, logEndOffset=0): the topicId
> MfuZbwdmSMaiSa0g6__TPg does not exist in the metadata
> image","logger":"kafka.server.metadata.BrokerMetadataPublisher$"}
> {"timestamp":"2023-01-11T15:18:51,578Z","level":"ERROR","thread":"EventHandler","message":"[Broker
> id=4] Unable to start fetching foo.test-9 with topic ID
> RzalpqQ9Q7ub2M2afHxY4Q due to
> InconsistentTopicIdException","logger":"state.change.logger","throwable":{"class":"org.apache.kafka.common.errors.InconsistentTopicIdException","msg":"Tried
> to assign topic ID RzalpqQ9Q7ub2M2afHxY4Q to log for topic partition
> foo.test-9,but log already contained topic ID MfuZbwdmSMaiSa0g6__TPg"}}
> {"timestamp":"2023-01-11T15:18:51,577Z","level":"INFO","thread":"EventHandler","message":"[Broker
> id=4] Creating new partition foo.test-9 with topic id
> RzalpqQ9Q7ub2M2afHxY4Q.","logger":"state.change.logger"}
> {"timestamp":"2023-01-11T15:18:51,465Z","level":"INFO","thread":"log-recovery-/kafka/d1/data-0","message":"Completed
> load of Log(dir=/kafka/d1/data/foo.test-9, topicId=MfuZbwdmSMaiSa0g6__TPg,
> topic=foo.test, partition=9, highWatermark=0, lastStableOffset=0,
> logStartOffset=0, logEndOffset=0) with 1 segments in 4ms (41/46 completed in
> /kafka/d1/data)","logger":"kafka.log.LogManager"}
> {"timestamp":"2023-01-11T15:18:51,464Z","level":"INFO","thread":"log-recovery-/kafka/d1/data-0","message":"[LogLoader
> partition=foo.test-9, dir=/kafka/d1/data] Loading producer state till offset
> 0 with message format version 2","logger":"kafka.log.UnifiedLog$"}
>
> {code}
> So what we noticed here is that KRaft didn’t recreate the log dir for
> partitions {{0, 7, 8, 9}} for the recreated topic after it deleted them for
> the deleted topic.
> There're few behaviours here that are different than ZK mode (to the extent
> of our knowledge):
> # In ZK mode deleting a topic is a blocking step for recreating the topic,
> so in our case in ZK mode, the deletion of {{foo.test}} will be waiting for
> broker 4 to recover first, complete the deletion then a user can recreate the
> topic. (KIP-516 proposed changing the log.dir layout from
> <{_}{{topic><partition>}} format to
> `{{{}topicIdprefix{}}}/<topicId><{_}partition>` but this has not been done
> yet".). While KRaft is fire and forget.
> # In ZK mode, creating a new topic doesn't assign partitions to non-active
> brokers. In the scenario we faced, the behavior of KRaft seems to be
> different. KRaft assigned partitions to broker 4 while broker 4 was down.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)