[
https://issues.apache.org/jira/browse/KAFKA-15495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ron Dagostino updated KAFKA-15495:
----------------------------------
Summary: KRaft partition truncated when the only ISR member restarts with
an empty disk (was: KRaft partition truncated when the only ISR member
restarts with and empty disk)
> KRaft partition truncated when the only ISR member restarts with an empty disk
> ------------------------------------------------------------------------------
>
> Key: KAFKA-15495
> URL: https://issues.apache.org/jira/browse/KAFKA-15495
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 3.3.2, 3.4.1, 3.6.0, 3.5.1
> Reporter: Ron Dagostino
> Priority: Critical
>
> Assume a topic-partition in KRaft has just a single leader replica in the
> ISR. Assume next that this replica goes offline. This replica's log will
> define the contents of that partition when the replica restarts, which is
> correct behavior. However, assume now that the replica has a disk failure,
> and we then replace the failed disk with a new, empty disk that we also
> format with the storage tool so it has the correct cluster ID. If we then
> restart the broker, the topic-partition will have no data in it, and any
> other replicas that might exist will truncate their logs to match. See below
> for a step-by-step demo of how to reproduce this.
> [KIP-858: Handle JBOD broker disk failure in
> KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
> introduces the concept of a Disk UUID that we can use to solve this problem.
> Specifically, when the leader restarts with an empty (but
> correctly-formatted) disk, the actual UUID associated with the disk will be
> different. The controller will notice upon broker re-registration that its
> disk UUID differs from what was previously registered. Right now we have no
> way of detecting this situation, but the disk UUID gives us that capability.
> STEPS TO REPRODUCE:
> Create a single broker cluster with single controller. The standard files
> under config/kraft work well:
> bin/kafka-storage.sh random-uuid
> J8qXRwI-Qyi2G0guFTiuYw
> # ensure we start clean
> /bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
> config/kraft/controller.properties
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
> config/kraft/broker.properties
> bin/kafka-server-start.sh config/kraft/controller.properties
> bin/kafka-server-start.sh config/kraft/broker.properties
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1
> --partitions 1 --replication-factor 1
> # create __consumer-offsets topics
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1
> --from-beginning
> ^C
> # confirm that __consumer_offsets topic partitions are all created and on
> broker with node id 2
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
> Now create 2 more brokers, with node IDs 3 and 4
> cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed
> 's/localhost:9092/localhost:9011/g' | sed
> 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' >
> config/kraft/broker11.properties
> cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed
> 's/localhost:9092/localhost:9012/g' | sed
> 's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' >
> config/kraft/broker12.properties
> # ensure we start clean
> /bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
> config/kraft/broker11.properties
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
> config/kraft/broker12.properties
> bin/kafka-server-start.sh config/kraft/broker11.properties
> bin/kafka-server-start.sh config/kraft/broker12.properties
> # create a topic with a single partition replicated on two brokers
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2
> --partitions 1 --replication-factor 2
> # reassign partitions onto brokers with Node IDs 11 and 12
> cat > /tmp/reassign.json <<DONE
> {"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}],
> "version":1}
> DONE
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092
> --reassignment-json-file /tmp/reassign.json --execute
> bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092
> --reassignment-json-file /tmp/reassign.json --verify
> # make preferred leader 11 the actual leader if it not
> bin/kafka-leader-election.sh --bootstrap-server localhost:9092
> --all-topic-partitions --election-type preferred
> # Confirm both brokers are in ISR and 11 is the leader
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr:
> 12,11
> # Emit some messages to the topic
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
> 1
> 2
> 3
> 4
> 5
> ^C
> # confirm we see the messages
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2
> --from-beginning
> 1
> 2
> 3
> 4
> 5
> ^C
> # Again confirm both brokers are in ISR, leader is 11
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr:
> 12,11
> # kill non-leader broker
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11
> # kill leader broker
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: foo2 Partition: 0 Leader: none Replicas: 11,12 Isr: 11
> # Note that bringing the non-leader broker 12 back up at this point has no
> effect: it is offline with no leader, only node 11 is in the ISR, and the
> partition cannot return until node 11 returns
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: foo2 Partition: 0 Leader: none Replicas: 11,12 Isr: 11
> # erase and reformat leader broker’s disk, and then restart the leader with
> that empty disk. (Note that follower broker remains untouched/unchanged if
> it wasn’t started, or it is started and is waiting for 11 to come back)
> /bin/rm -rf /tmp/kraft-broker-logs11
> bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config
> config/kraft/broker11.properties
> bin/kafka-server-start.sh config/kraft/broker11.properties
> # if node 12 was running it will emit log messages indicating truncation
> # INFO [ReplicaFetcher replicaId=12, leaderId=11, fetcherId=0] Truncating
> partition foo2-0 with TruncationState(offset=0, completed=true) due to leader
> epoch and offset EpochEndOffset(errorCode=0, partition=0, leaderEpoch=3,
> endOffset=0) (kafka.server.ReplicaFetcherThread)
> # Leader broker is the leader again (the below output either will or won’t
> show node 12 as being in the ISR depending on whether it had been running or
> not, respectively)
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr: 11
> # read from topic-partition: it is now empty
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2
> --from-beginning
> # produce a message to it, message will appear on console consumer
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
> 1
> ^C
> # restart follower broker if it had not already been restarted, and will emit
> a log message indicating the log was truncated:
> bin/kafka-server-start.sh config/kraft/broker12.properties
> # WARN [UnifiedLog partition=foo2-0, dir=/tmp/kraft-broker-logs12]
> Non-monotonic update of high watermark from (offset=5, segment=[0:165]) to
> (offset=0, segment=[0:0]) (kafka.log.UnifiedLog)
> # follower is back in the ISR
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
> Topic: foo2 TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1
> ReplicationFactor: 2 Configs: segment.bytes=1073741824
> Topic: foo2 Partition: 0 Leader: 11 Replicas: 11,12 Isr:
> 11,12
> # can redo consumer to again show data is gone
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2
> --from-beginning
--
This message was sent by Atlassian Jira
(v8.20.10#820010)