[
https://issues.apache.org/jira/browse/KAFKA-16790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Luke Chen resolved KAFKA-16790.
-------------------------------
Fix Version/s: 3.8.0
3.7.1
Resolution: Fixed
> Calls to RemoteLogManager are made before it is configured
> ----------------------------------------------------------
>
> Key: KAFKA-16790
> URL: https://issues.apache.org/jira/browse/KAFKA-16790
> Project: Kafka
> Issue Type: Bug
> Components: kraft
> Affects Versions: 3.8.0
> Reporter: Christo Lolov
> Assignee: Muralidhar Basani
> Priority: Major
> Fix For: 3.8.0, 3.7.1
>
>
> BrokerMetadataPublisher#onMetadataUpdate calls ReplicaManager#applyDelta (1)
> which in turn calls RemoteLogManager#onLeadershipChange (2), however, the
> RemoteLogManager is configured after the BrokerMetadataPublisher starts
> running (3, 4). This is incorrect, we either need to initialise the
> RemoteLogManager before we start the BrokerMetadataPublisher or we need to
> skip calls to onLeadershipChange if the RemoteLogManager is not initialised.
> (1)
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala#L151]
> (2)
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2737]
> (3)
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L432]
> (4)
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/BrokerServer.scala#L515]
> The way to reproduce the problem is by looking at the following changes
> {code:java}
> config/kraft/broker.properties | 10 ++++++++++
> .../main/java/kafka/log/remote/RemoteLogManager.java | 8 +++++++-
> core/src/main/scala/kafka/server/ReplicaManager.scala | 6 +++++-
> 3 files changed, 22 insertions(+), 2 deletions(-)diff --git
> a/config/kraft/broker.properties b/config/kraft/broker.properties
> index 2d15997f28..39d126cf87 100644
> --- a/config/kraft/broker.properties
> +++ b/config/kraft/broker.properties
> @@ -127,3 +127,13 @@ log.segment.bytes=1073741824
> # The interval at which log segments are checked to see if they can be
> deleted according
> # to the retention policies
> log.retention.check.interval.ms=300000
> +
> +remote.log.storage.system.enable=true
> +remote.log.metadata.manager.listener.name=PLAINTEXT
> +remote.log.storage.manager.class.name=org.apache.kafka.server.log.remote.storage.LocalTieredStorage
> +remote.log.storage.manager.class.path=/home/ec2-user/kafka/storage/build/libs/kafka-storage-3.8.0-SNAPSHOT-test.jar
> +remote.log.storage.manager.impl.prefix=rsm.config.
> +remote.log.metadata.manager.impl.prefix=rlmm.config.
> +rsm.config.dir=/tmp/kafka-remote-storage
> +rlmm.config.remote.log.metadata.topic.replication.factor=1
> +log.retention.check.interval.ms=1000
> diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> index 6555b7c0cd..e84a072abc 100644
> --- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> +++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
> @@ -164,6 +164,7 @@ public class RemoteLogManager implements Closeable {
> // The endpoint for remote log metadata manager to connect to
> private Optional<EndPoint> endpoint = Optional.empty();
> private boolean closed = false;
> + private boolean up = false;
>
> /**
> * Creates RemoteLogManager instance with the given arguments.
> @@ -298,6 +299,7 @@ public class RemoteLogManager implements Closeable {
> // in connecting to the brokers or remote storages.
> configureRSM();
> configureRLMM();
> + up = true;
> }
>
> public RemoteStorageManager storageManager() {
> @@ -329,7 +331,11 @@ public class RemoteLogManager implements Closeable {
> public void onLeadershipChange(Set<Partition> partitionsBecomeLeader,
> Set<Partition> partitionsBecomeFollower,
> Map<String, Uuid> topicIds) {
> - LOGGER.debug("Received leadership changes for leaders: {} and
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
> + if (!up) {
> + LOGGER.error("NullPointerException");
> + return;
> + }
> + LOGGER.error("Received leadership changes for leaders: {} and
> followers: {}", partitionsBecomeLeader, partitionsBecomeFollower);
>
> Map<TopicIdPartition, Integer> leaderPartitionsWithLeaderEpoch =
> filterPartitions(partitionsBecomeLeader)
> .collect(Collectors.toMap(
> diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
> b/core/src/main/scala/kafka/server/ReplicaManager.scala
> index 35499430d6..bd3f41c3d6 100644
> --- a/core/src/main/scala/kafka/server/ReplicaManager.scala
> +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
> @@ -2688,6 +2688,7 @@ class ReplicaManager(val config: KafkaConfig,
> */
> def applyDelta(delta: TopicsDelta, newImage: MetadataImage): Unit = {
> // Before taking the lock, compute the local changes
> + stateChangeLogger.error("ROBIN")
> val localChanges = delta.localChanges(config.nodeId)
> val metadataVersion = newImage.features().metadataVersion()
>
> @@ -2734,7 +2735,10 @@ class ReplicaManager(val config: KafkaConfig,
> replicaFetcherManager.shutdownIdleFetcherThreads()
> replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
>
> - remoteLogManager.foreach(rlm =>
> rlm.onLeadershipChange(leaderChangedPartitions.asJava,
> followerChangedPartitions.asJava, localChanges.topicIds()))
> + remoteLogManager.foreach(rlm => {
> + stateChangeLogger.error("JOKER")
> + rlm.onLeadershipChange(leaderChangedPartitions.asJava,
> followerChangedPartitions.asJava, localChanges.topicIds())
> + })
> }
>
> if (metadataVersion.isDirectoryAssignmentSupported) { {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)