This is an automated email from the ASF dual-hosted git repository. mittal pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 43a25043dd0 KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete method (#20280) 43a25043dd0 is described below commit 43a25043dd06458754e4eee767f93cfd406936ef Author: Chirag Wadhwa <cwad...@confluent.io> AuthorDate: Sun Aug 10 14:44:58 2025 +0530 KAFKA-19567: Added the check for underlying partition being the leader in delayedShareFetch tryComplete method (#20280) In the current implementation, some delayed share fetch operations get trapped in the delayed share fetch purgatory when the partition leaderships change during share consumption. This is because there is no check in code to make sure the current broker is still the partition leader corresponding to the share partitions. So, when leadership changes, the share partitions cannot be acquired, because they have already been fenced, and tryComplete returns false. Although the operatio does get completed when the timer expires for it, but it is too late by then, and the operation get stuck in the watchers list waiting for it to get purged when estimated operations increase to more than 1000. This Pr resolves this by adding the required check so that if partition leadership changes, then the delayed share fetches waiting on it gets completed instantaneously. Reviewers: Apoorv Mittal <apoorvmitta...@gmail.com>, Andrew Schofield <aschofi...@confluent.io> --- .../java/kafka/server/share/DelayedShareFetch.java | 24 ++- .../kafka/server/share/SharePartitionManager.java | 1 + .../kafka/server/share/DelayedShareFetchTest.java | 172 +++++++++++++++++++++ .../server/share/SharePartitionManagerTest.java | 3 +- 4 files changed, 195 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index aa619ee91a8..969029a6ea5 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.errors.NotLeaderException; import org.apache.kafka.server.LogReadResult; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.purgatory.DelayedOperation; @@ -368,6 +369,14 @@ public class DelayedShareFetch extends DelayedOperation { "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), sharePartitions.keySet()); } + // At this point, there could be delayed requests sitting in the purgatory which are waiting on + // DelayedShareFetchPartitionKeys corresponding to partitions, whose leader has been changed to a different broker. + // In that case, such partitions would not be able to get acquired, and the tryComplete will keep on returning false. + // Eventually the operation will get timed out and completed, but it might not get removed from the purgatory. + // This has been eventually left it like this because the purging mechanism will trigger whenever the number of completed + // but still being watched operations is larger than the purge interval. This purge interval is defined by the config + // share.fetch.purgatory.purge.interval.requests and is 1000 by default, thereby ensuring that such stale operations do not + // grow indefinitely. return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); @@ -757,7 +766,8 @@ public class DelayedShareFetch extends DelayedOperation { * Case a: The partition is in an offline log directory on this broker * Case b: This broker does not know the partition it tries to fetch * Case c: This broker is no longer the leader of the partition it tries to fetch - * Case d: All remote storage read requests completed + * Case d: This broker is no longer the leader or follower of the partition it tries to fetch + * Case e: All remote storage read requests completed * @return boolean representing whether the remote fetch is completed or not. */ private boolean maybeCompletePendingRemoteFetch() { @@ -765,14 +775,20 @@ public class DelayedShareFetch extends DelayedOperation { for (TopicIdPartition topicIdPartition : pendingRemoteFetchesOpt.get().fetchOffsetMetadataMap().keySet()) { try { - replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + if (!partition.isLeader()) { + throw new NotLeaderException("Broker is no longer the leader of topicPartition: " + topicIdPartition); + } } catch (KafkaStorageException e) { // Case a log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); canComplete = true; } catch (UnknownTopicOrPartitionException e) { // Case b log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); canComplete = true; - } catch (NotLeaderOrFollowerException e) { // Case c + } catch (NotLeaderException e) { // Case c + log.debug("Broker is no longer the leader of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (NotLeaderOrFollowerException e) { // Case d log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); canComplete = true; } @@ -780,7 +796,7 @@ public class DelayedShareFetch extends DelayedOperation { break; } - if (canComplete || pendingRemoteFetchesOpt.get().isDone()) { // Case d + if (canComplete || pendingRemoteFetchesOpt.get().isDone()) { // Case e return forceComplete(); } else return false; diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 5f0bf1fa239..4cd7a61cf6b 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -774,6 +774,7 @@ public class SharePartitionManager implements AutoCloseable { if (sharePartition != null) { sharePartition.markFenced(); replicaManager.removeListener(sharePartitionKey.topicIdPartition().topicPartition(), sharePartition.listener()); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(sharePartitionKey.groupId(), sharePartitionKey.topicIdPartition())); } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 3b7701b7724..23f32659189 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -153,6 +153,16 @@ public class DelayedShareFetchTest { when(sp0.canAcquireRecords()).thenReturn(false); when(sp1.canAcquireRecords()).thenReturn(false); + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + ReplicaManager replicaManager = mock(ReplicaManager.class); + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(new MockTime()); Uuid fetchId = Uuid.randomUuid(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() @@ -160,6 +170,7 @@ public class DelayedShareFetchTest { .withSharePartitions(sharePartitions) .withShareGroupMetrics(shareGroupMetrics) .withFetchId(fetchId) + .withReplicaManager(replicaManager) .build()); when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); @@ -218,6 +229,15 @@ public class DelayedShareFetchTest { PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Set.of(tp0)); + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + Time time = mock(Time.class); when(time.hiResClockMs()).thenReturn(100L).thenReturn(110L); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); @@ -287,6 +307,15 @@ public class DelayedShareFetchTest { mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata); BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler(); + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + Uuid fetchId = Uuid.randomUuid(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) @@ -580,6 +609,19 @@ public class DelayedShareFetchTest { List<DelayedOperationKey> delayedShareFetchWatchKeys = new ArrayList<>(); topicIdPartitions1.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + Partition p2 = mock(Partition.class); + when(p2.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenReturn(p2); + Uuid fetchId1 = Uuid.randomUuid(); DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch1) @@ -737,6 +779,12 @@ public class DelayedShareFetchTest { when(time.hiResClockMs()).thenReturn(100L).thenReturn(110L).thenReturn(170L); ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time); Uuid fetchId = Uuid.randomUuid(); + + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) @@ -881,10 +929,18 @@ public class DelayedShareFetchTest { BROKER_TOPIC_STATS); Uuid fetchId = Uuid.randomUuid(); + + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + ReplicaManager replicaManager = mock(ReplicaManager.class); + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) .withFetchId(fetchId) + .withReplicaManager(replicaManager) .build(); when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); @@ -1263,6 +1319,19 @@ public class DelayedShareFetchTest { when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + Partition p2 = mock(Partition.class); + when(p2.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenReturn(p2); + Uuid fetchId = Uuid.randomUuid(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) @@ -1288,6 +1357,70 @@ public class DelayedShareFetchTest { delayedShareFetch.lock().unlock(); } + @Test + public void testRemoteStorageFetchPartitionLeaderChanged() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + + SharePartition sp0 = mock(SharePartition.class); + + when(sp0.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + + // Fetch offset does not match with the cached entry for sp0, hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(false); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + + Uuid fetchId = Uuid.randomUuid(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0))) + .withFetchId(fetchId) + .build()); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock(fetchId)).thenReturn(true); + + // Mock the behaviour of replica manager such that remote storage fetch completion timer task completes on adding it to the watch queue. + doAnswer(invocationOnMock -> { + TimerTask timerTask = invocationOnMock.getArgument(0); + timerTask.run(); + return null; + }).when(replicaManager).addShareFetchTimerRequest(any()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // Remote fetch object gets created for delayed share fetch object. + assertNotNull(delayedShareFetch.pendingRemoteFetches()); + // Verify the locks are released for local log read topic partitions tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + @Test public void testRemoteStorageFetchTryCompleteThrowsException() { ReplicaManager replicaManager = mock(ReplicaManager.class); @@ -1516,6 +1649,16 @@ public class DelayedShareFetchTest { when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); Uuid fetchId = Uuid.randomUuid(); + + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) @@ -1586,6 +1729,12 @@ public class DelayedShareFetchTest { when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); Uuid fetchId = Uuid.randomUuid(); + + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) @@ -1679,6 +1828,19 @@ public class DelayedShareFetchTest { }).when(remoteLogManager).asyncRead(any(), any()); when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + Partition p2 = mock(Partition.class); + when(p2.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenReturn(p2); + Uuid fetchId = Uuid.randomUuid(); DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) @@ -1761,6 +1923,16 @@ public class DelayedShareFetchTest { when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); Uuid fetchId = Uuid.randomUuid(); + + Partition p0 = mock(Partition.class); + when(p0.isLeader()).thenReturn(true); + + Partition p1 = mock(Partition.class); + when(p1.isLeader()).thenReturn(true); + + when(replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn(p0); + when(replicaManager.getPartitionOrException(tp1.topicPartition())).thenReturn(p1); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetch) .withSharePartitions(sharePartitions) diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 5d2d2e9a377..399a8dee761 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -2622,7 +2622,8 @@ public class SharePartitionManagerTest { assertEquals(Errors.FENCED_STATE_EPOCH.code(), partitionDataMap.get(tp2).errorCode()); assertEquals("Fenced state epoch", partitionDataMap.get(tp2).errorMessage()); - Mockito.verify(replicaManager, times(0)).completeDelayedShareFetchRequest(any()); + Mockito.verify(replicaManager, times(1)).completeDelayedShareFetchRequest( + new DelayedShareFetchGroupKey(groupId, tp2)); Mockito.verify(replicaManager, times(1)).readFromLog( any(), any(), any(ReplicaQuota.class), anyBoolean()); // Should have 1 fetch recorded and 1 failure as single topic has multiple partition fetch