Yang Ling created KAFKA-14106:
---------------------------------
Summary: Fetcher thread was shutdown and all fetched partitions
are lost.
Key: KAFKA-14106
URL: https://issues.apache.org/jira/browse/KAFKA-14106
Project: Kafka
Issue Type: Bug
Components: replication
Affects Versions: 3.0.0, 2.2.2
Reporter: Yang Ling
Dynamic changes of listeners will lead into out of sync. Our operation is as
following:
# Broker is started and listening on a ip-address.
# Create some topics.
# Change listening to a domain name via dynamic-configuration for some reason.
# Create some new topics.
# Produce message into any older topics.
# All topics, produced in step 5, are out of sync.
Following is major logs:
{panel}
[2022-07-23 15:30:53,282] INFO [ReplicaFetcherManager on broker 0] Added
fetcher to broker BrokerEndPoint(id=2, host=168.1.3.88:9092) for partitions
Map(test-11 -> (offset=0, leaderEpoch=0), test-5 -> (offset=0, leaderEpoch=0),
test-8 -> (offset=0, leaderEpoch=0), test-2 -> (offset=0, leaderEpoch=0))
(kafka.server.ReplicaFetcherManager)
[2022-07-25 15:01:51,581] INFO [ReplicaFetcher replicaId=0, leaderId=2,
fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
[2022-07-25 18:14:05,297] INFO [ReplicaFetcherManager on broker 0]Added fetcher
to broker BrokerEndPoint(id=2, host=kafka-server-1:9092) for partitions
Map(test2-6 -> (offset=0, leaderEpoch=0), test2-0 -> (offset=0, leaderEpoch=0),
test2-3 -> (offset=0, leaderEpoch=0), test2-9 -> (offset=0, leaderEpoch=0))
(kafka.server.ReplicaFetcherManager)
{panel}
After read source code. We found following code in AbstractFetcherManager:
{code:scala}
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition,
InitialFetchState]) {
...
for ((brokerAndFetcherId, initialFetchOffsets) <- partitionsPerFetcher) {
val brokerIdAndFetcherId =
BrokerIdAndFetcherId(brokerAndFetcherId.broker.id, brokerAndFetcherId.fetcherId)
val fetcherThread = fetcherThreadMap.get(brokerIdAndFetcherId) match {
case Some(currentFetcherThread) if currentFetcherThread.sourceBroker
== brokerAndFetcherId.broker =>
currentFetcherThread
case Some(f) =>
f.shutdown() // ----------------- marked
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
case None =>
addAndStartFetcherThread(brokerAndFetcherId, brokerIdAndFetcherId)
}
}
}
...
}
{code}
As marked code defined, if sourceBroker is changed, in our case, the older
fetcher thread will be shutdown and a new fetcher thread will be created using
new sourceBroker. In this way, all of the fetched partitions in older fetcher
thread will be lost.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)