This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1962917436f KAFKA-17674: Fix bug on update positions of newly added 
partitions (#17342)
1962917436f is described below

commit 1962917436f463541f9bb63791b7ed55c23ce8c1
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Oct 3 07:10:54 2024 -0400

    KAFKA-17674: Fix bug on update positions of newly added partitions (#17342)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../consumer/internals/OffsetsRequestManager.java  | 27 ++++++++------
 .../consumer/internals/SubscriptionState.java      | 24 ++++++-------
 .../internals/OffsetsRequestManagerTest.java       | 42 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 23 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 7ce9811d414..5942bedfb0c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -283,14 +283,15 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
         cacheExceptionIfEventExpired(result, deadlineMs);
 
         CompletableFuture<Void> updatePositions;
+        final Set<TopicPartition> initializingPartitions = 
subscriptionState.initializingPartitions();
         if (commitRequestManager != null) {
-            CompletableFuture<Void> refreshWithCommittedOffsets = 
initWithCommittedOffsetsIfNeeded(deadlineMs);
+            CompletableFuture<Void> refreshWithCommittedOffsets = 
initWithCommittedOffsetsIfNeeded(initializingPartitions, deadlineMs);
 
             // Reset positions for all partitions that may still require it 
(or that are awaiting reset)
-            updatePositions = refreshWithCommittedOffsets.thenCompose(__ -> 
initWithPartitionOffsetsIfNeeded());
+            updatePositions = refreshWithCommittedOffsets.thenCompose(__ -> 
initWithPartitionOffsetsIfNeeded(initializingPartitions));
 
         } else {
-            updatePositions = initWithPartitionOffsetsIfNeeded();
+            updatePositions = 
initWithPartitionOffsetsIfNeeded(initializingPartitions);
         }
 
         updatePositions.whenComplete((__, resetError) -> {
@@ -324,19 +325,21 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
     }
 
     /**
-     * If there are partitions still needing a position and a reset policy is 
defined, request reset using the
-     * default policy.
+     * If there are partitions still needing a position and a reset policy is 
defined, request reset using the default policy.
      *
+     * @param initializingPartitions Set of partitions that should be 
initialized. This won't reset positions for
+     *                               partitions that may have been added to 
the subscription state, but that are not
+     *                               included in this set.
      * @return Future that will complete when the reset operation completes 
retrieving the offsets and setting
      * positions in the subscription state using them.
      * @throws NoOffsetForPartitionException If no reset strategy is 
configured.
      */
-    private CompletableFuture<Void> initWithPartitionOffsetsIfNeeded() {
+    private CompletableFuture<Void> 
initWithPartitionOffsetsIfNeeded(Set<TopicPartition> initializingPartitions) {
         CompletableFuture<Void> result = new CompletableFuture<>();
         try {
             // Mark partitions that need reset, using the configured reset 
strategy. If no
             // strategy is defined, this will raise a 
NoOffsetForPartitionException exception.
-            subscriptionState.resetInitializingPositions();
+            
subscriptionState.resetInitializingPositions(initializingPartitions::contains);
         } catch (Exception e) {
             result.completeExceptionally(e);
             return result;
@@ -351,11 +354,15 @@ public class OffsetsRequestManager implements 
RequestManager, ClusterResourceLis
      * Fetch the committed offsets for partitions that require initialization. 
This will trigger an OffsetFetch
      * request and update positions in the subscription state once a response 
is received.
      *
+     * @param initializingPartitions Set of partitions to update with a 
position. This same set will be kept
+     *                               throughout the whole process (considered 
when fetching committed offsets, and
+     *                               when resetting positions for partitions 
that may not have committed offsets).
+     * @param deadlineMs             Deadline of the application event that 
triggered this operation. Used to
+     *                               determine how much time to allow for the 
reused offset fetch to complete.
      * @throws TimeoutException If offsets could not be retrieved within the 
timeout
      */
-    private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(long 
deadlineMs) {
-        final Set<TopicPartition> initializingPartitions = 
subscriptionState.initializingPartitions();
-
+    private CompletableFuture<Void> 
initWithCommittedOffsetsIfNeeded(Set<TopicPartition> initializingPartitions,
+                                                                     long 
deadlineMs) {
         if (initializingPartitions.isEmpty()) {
             return CompletableFuture.completedFuture(null);
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index bc7d20d11e0..b7d90431166 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -32,7 +31,6 @@ import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -773,21 +771,17 @@ public class SubscriptionState {
     }
 
     /**
-     * Note: this will not attempt to reset partitions that are in the process 
of being assigned
-     * and are pending the completion of any {@link 
ConsumerRebalanceListener#onPartitionsAssigned(Collection)}
-     * callbacks.
+     * Request reset for partitions that require a position, using the 
configured reset strategy.
      *
-     * <p/>
-     *
-     * This method only appears to be invoked the by the {@link KafkaConsumer} 
during its
-     * {@link KafkaConsumer#poll(Duration)} logic. <em>Direct</em> calls to 
methods like
-     * {@link #requestOffsetReset(TopicPartition)}, {@link 
#requestOffsetResetIfPartitionAssigned(TopicPartition)},
-     * etc. do <em>not</em> skip partitions pending assignment.
+     * @param initPartitionsToInclude Initializing partitions to include in 
the reset. Assigned partitions that
+     *                                require a positions but are not included 
in this set won't be reset.
+     * @throws NoOffsetForPartitionException If there are partitions assigned 
that require a position but
+     *                                       there is no reset strategy 
configured.
      */
-    public synchronized void resetInitializingPositions() {
+    public synchronized void 
resetInitializingPositions(Predicate<TopicPartition> initPartitionsToInclude) {
         final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
         assignment.forEach((tp, partitionState) -> {
-            if (partitionState.shouldInitialize()) {
+            if (partitionState.shouldInitialize() && 
initPartitionsToInclude.test(tp)) {
                 if (defaultResetStrategy == OffsetResetStrategy.NONE)
                     partitionsWithNoOffsets.add(tp);
                 else
@@ -799,6 +793,10 @@ public class SubscriptionState {
             throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
     }
 
+    public synchronized void resetInitializingPositions() {
+        resetInitializingPositions(tp -> true);
+    }
+
     public synchronized Set<TopicPartition> partitionsNeedingReset(long nowMs) 
{
         return collectPartitions(state -> state.awaitingReset() && 
!state.awaitingRetryBackoff(nowMs));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 503878affa0..996ae05feb9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -52,6 +52,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -76,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -747,6 +749,46 @@ public class OffsetsRequestManagerTest {
         verify(subscriptionState, never()).seekUnvalidated(any(), any());
     }
 
+    // This test ensures that we don't reset positions to the partition 
offsets for a partition assigned while the
+    // updateFetchPositions is running (after the OffsetFetch request has been 
sent).
+    @Test
+    public void 
testUpdatePositionsDoesNotResetPositionBeforeRetrievingOffsetsForNewlyAddedPartition()
 {
+        long internalFetchCommittedTimeout = time.milliseconds() + 
DEFAULT_API_TIMEOUT_MS;
+        TopicPartition tp1 = new TopicPartition("topic1", 1);
+        Set<TopicPartition> initPartitions1 = Collections.singleton(tp1);
+        Metadata.LeaderAndEpoch leaderAndEpoch = testLeaderEpoch(LEADER_1, 
Optional.of(1));
+
+        // tp1 assigned and requires a position
+        mockAssignedPartitionsMissingPositions(initPartitions1, 
initPartitions1, leaderAndEpoch);
+
+        // call to updateFetchPositions will trigger an OffsetFetch request 
for tp1 (won't complete just yet)
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult 
= new CompletableFuture<>();
+        when(commitRequestManager.fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout)).thenReturn(fetchResult);
+        CompletableFuture<Boolean> updatePositions1 = 
requestManager.updateFetchPositions(time.milliseconds());
+        assertFalse(updatePositions1.isDone());
+        verify(commitRequestManager).fetchOffsets(initPartitions1, 
internalFetchCommittedTimeout);
+        clearInvocations(commitRequestManager);
+
+        // tp2 added to the assignment when the Offset Fetch request is 
already sent including tp1 only
+        TopicPartition tp2 = new TopicPartition("topic2", 2);
+        Set<TopicPartition> initPartitions2 = new HashSet<>(Arrays.asList(tp1, 
tp2));
+        mockAssignedPartitionsMissingPositions(initPartitions2, 
initPartitions2, leaderAndEpoch);
+
+        // tp2 requires a position, but shouldn't be reset after receiving the 
offset fetch response that will only
+        // include the requested partition tp1
+        
when(subscriptionState.initializingPartitions()).thenReturn(initPartitions2);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10, 
Optional.empty(), "");
+        fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata));
+
+        // Position should have been updated for tp1 using the committed offset
+        SubscriptionState.FetchPosition expectedPosition = new 
SubscriptionState.FetchPosition(
+            offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), 
leaderAndEpoch);
+        verify(subscriptionState).seekUnvalidated(tp1, expectedPosition);
+
+        // Reset positions shouldn't include tp2
+        verify(subscriptionState).resetInitializingPositions(argThat(p -> 
!p.test(tp2)));
+    }
+
     @Test
     public void testRemoteListOffsetsRequestTimeoutMs() {
         int requestTimeoutMs = 100;

Reply via email to