Ron Dagostino created KAFKA-15495:
-------------------------------------

             Summary: KRaft partition truncated when the only ISR member 
restarts with and empty disk
                 Key: KAFKA-15495
                 URL: https://issues.apache.org/jira/browse/KAFKA-15495
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 3.5.1, 3.4.1, 3.3.2, 3.6.0
            Reporter: Ron Dagostino


Assume a topic-partition in KRaft has just a single leader replica in the ISR.  
Assume next that this replica goes offline.  This replica's log will define the 
contents of that partition when the replica restarts, which is correct 
behavior.  However, assume now that the replica has a disk failure, and we then 
replace the failed disk with a new, empty disk that we also format with the 
storage tool so it has the correct cluster ID.  If we then restart the broker, 
the topic-partition will have no data in it, and any other replicas that might 
exist will truncate their logs to match.  See below for a step-by-step demo of 
how to reproduce this.

[KIP-858: Handle JBOD broker disk failure in 
KRaft|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]
 introduces the concept of a Disk UUID that we can use to solve this problem.  
Specifically, when the leader restarts with an empty (but correctly-formatted) 
disk, the actual UUID associated with the disk will be different.  The 
controller will notice upon broker re-registration that its disk UUID differs 
from what was previously registered.  Right now we have no way of detecting 
this situation, but the disk UUID gives us that capability.


STEPS TO REPRODUCE:

Create a single broker cluster with single controller.  The standard files 
under config/kraft work well:

bin/kafka-storage.sh random-uuid
J8qXRwI-Qyi2G0guFTiuYw

# ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs /tmp/kraft-controller-logs

bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/controller.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker.properties

bin/kafka-server-start.sh config/kraft/controller.properties
bin/kafka-server-start.sh config/kraft/broker.properties

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo1 
--partitions 1 --replication-factor 1

# create __consumer-offsets topics
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo1 
--from-beginning
^C

# confirm that __consumer_offsets topic partitions are all created and on 
broker with node id 2
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Now create 2 more brokers, with node IDs 3 and 4

cat config/kraft/broker.properties | sed 's/node.id=2/node.id=11/' | sed 
's/localhost:9092/localhost:9011/g' |  sed 
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs11#' > 
config/kraft/broker11.properties
cat config/kraft/broker.properties | sed 's/node.id=2/node.id=12/' | sed 
's/localhost:9092/localhost:9012/g' |  sed 
's#log.dirs=/tmp/kraft-broker-logs#log.dirs=/tmp/kraft-broker-logs12#' > 
config/kraft/broker12.properties

# ensure we start clean
/bin/rm -rf /tmp/kraft-broker-logs11 /tmp/kraft-broker-logs12

bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker11.properties
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker12.properties

bin/kafka-server-start.sh config/kraft/broker11.properties
bin/kafka-server-start.sh config/kraft/broker12.properties

# create a topic with a single partition replicated on two brokers
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic foo2 
--partitions 1 --replication-factor 2

# reassign partitions onto brokers with Node IDs 11 and 12
cat > /tmp/reassign.json <<DONE
{"partitions":[{"topic": "foo2","partition": 0,"replicas": [11,12]}], 
"version":1}
DONE

bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--reassignment-json-file /tmp/reassign.json --execute
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 
--reassignment-json-file /tmp/reassign.json --verify

# make preferred leader 11 the actual leader if it not
bin/kafka-leader-election.sh --bootstrap-server localhost:9092 
--all-topic-partitions --election-type preferred

# Confirm both brokers are in ISR and 11 is the leader
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2     TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
12,11


# Emit some messages to the topic
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
1
2
3
4
5
^C

# confirm we see the messages
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
--from-beginning
1
2
3
4
5
^C

# Again confirm both brokers are in ISR, leader is 11
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2     TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
12,11

# kill non-leader broker
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2     TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 11

# kill leader broker
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2     TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: foo2     Partition: 0    Leader: none    Replicas: 11,12 Isr: 11

# Note that bringing the non-leader broker 12 back up at this point has no 
effect: it is offline with no leader, only node 11 is in the ISR, and the 
partition cannot return until node 11 returns
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2     TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: foo2     Partition: 0    Leader: none    Replicas: 11,12 Isr: 11

# erase and reformat leader broker’s disk, and then restart the leader with 
that empty disk.  (Note that follower broker remains untouched/unchanged if it 
wasn’t started, or it is started and is waiting for 11 to come back)
/bin/rm -rf /tmp/kraft-broker-logs11
bin/kafka-storage.sh format --cluster-id J8qXRwI-Qyi2G0guFTiuYw --config 
config/kraft/broker11.properties
bin/kafka-server-start.sh config/kraft/broker11.properties

# if node 12 was running it will emit log messages indicating truncation
# INFO [ReplicaFetcher replicaId=12, leaderId=11, fetcherId=0] Truncating 
partition foo2-0 with TruncationState(offset=0, completed=true) due to leader 
epoch and offset EpochEndOffset(errorCode=0, partition=0, leaderEpoch=3, 
endOffset=0) (kafka.server.ReplicaFetcherThread)

# Leader broker is the leader again (the below output either will or won’t show 
node 12 as being in the ISR depending on whether it had been running or not, 
respectively)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2     TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 11

# read from topic-partition: it is now empty
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
--from-beginning

# produce a message to it, message will appear on console consumer
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic foo2
1
^C

# restart follower broker if it had not already been restarted, and will emit a 
log message indicating the log was truncated:
bin/kafka-server-start.sh config/kraft/broker12.properties
# WARN [UnifiedLog partition=foo2-0, dir=/tmp/kraft-broker-logs12] 
Non-monotonic update of high watermark from (offset=5, segment=[0:165]) to 
(offset=0, segment=[0:0]) (kafka.log.UnifiedLog)

# follower is back in the ISR
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic foo2
Topic: foo2     TopicId: pbbQZ23UQ5mQqmZpoSRCLQ PartitionCount: 1       
ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: foo2     Partition: 0    Leader: 11      Replicas: 11,12 Isr: 
11,12

# can redo consumer to again show data is gone
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic foo2 
--from-beginning




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to