showuon commented on PR #14051: URL: https://github.com/apache/kafka/pull/14051#issuecomment-1646779495
@vamossagar12 , sorry for late response. For your suggestion: > Remove the canAddReplicaToIsr call from [here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Partition.scala#L956C56-L956C56)? I think we should keep it because it can quickly check it before acquiring the lock, and have a short circuit here. For the test, I just wrote a test to reproduce this NPE issue under `PartitionTest.scala`. For your refrence: ``` @Test def testIsReplicaIsrEligibleWithEmptyReplicaMap(): Unit = { val mockMetadataCache = mock(classOf[KRaftMetadataCache]) val partition = new Partition(topicPartition, replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs, interBrokerProtocolVersion = interBrokerProtocolVersion, localBrokerId = brokerId, () => defaultBrokerEpoch(brokerId), time, alterPartitionListener, delayedOperations, mockMetadataCache, logManager, alterPartitionManager) val spyPartition = spy(partition) when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition))) .thenReturn(None) val log = logManager.getOrCreateLog(topicPartition, topicId = None) seedLogData(log, numRecords = 6, leaderEpoch = 4) val controllerEpoch = 0 val leaderEpoch = 5 val remoteBrokerId = brokerId + 1 val replicas = List[Integer](brokerId, remoteBrokerId).asJava spyPartition.createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val initializeTimeMs = time.milliseconds() assertTrue(spyPartition.makeLeader( new LeaderAndIsrPartitionState() .setControllerEpoch(controllerEpoch) .setLeader(brokerId) .setLeaderEpoch(leaderEpoch) .setIsr(List[Integer](brokerId).asJava) .setPartitionEpoch(1) .setReplicas(replicas) .setIsNew(true), offsetCheckpoints, None), "Expected become leader transition to succeed") doAnswer(_ => { // simulate topic is deleted at the moment spyPartition.delete() val replica = new Replica(remoteBrokerId, topicPartition) spyPartition.updateFollowerFetchState(replica, mock(classOf[LogOffsetMetadata]), 0, initializeTimeMs, 0, 0) mock(classOf[LogReadInfo]) }).when(spyPartition).fetchRecords(any(), any(), anyLong(), anyInt(), anyBoolean(), anyBoolean()) fetchFollower(spyPartition, replicaId = remoteBrokerId, fetchOffset = 3L) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org