This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1962917436f KAFKA-17674: Fix bug on update positions of newly added
partitions (#17342)
1962917436f is described below
commit 1962917436f463541f9bb63791b7ed55c23ce8c1
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Oct 3 07:10:54 2024 -0400
KAFKA-17674: Fix bug on update positions of newly added partitions (#17342)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../consumer/internals/OffsetsRequestManager.java | 27 ++++++++------
.../consumer/internals/SubscriptionState.java | 24 ++++++-------
.../internals/OffsetsRequestManagerTest.java | 42 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 23 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
index 7ce9811d414..5942bedfb0c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java
@@ -283,14 +283,15 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
cacheExceptionIfEventExpired(result, deadlineMs);
CompletableFuture<Void> updatePositions;
+ final Set<TopicPartition> initializingPartitions =
subscriptionState.initializingPartitions();
if (commitRequestManager != null) {
- CompletableFuture<Void> refreshWithCommittedOffsets =
initWithCommittedOffsetsIfNeeded(deadlineMs);
+ CompletableFuture<Void> refreshWithCommittedOffsets =
initWithCommittedOffsetsIfNeeded(initializingPartitions, deadlineMs);
// Reset positions for all partitions that may still require it
(or that are awaiting reset)
- updatePositions = refreshWithCommittedOffsets.thenCompose(__ ->
initWithPartitionOffsetsIfNeeded());
+ updatePositions = refreshWithCommittedOffsets.thenCompose(__ ->
initWithPartitionOffsetsIfNeeded(initializingPartitions));
} else {
- updatePositions = initWithPartitionOffsetsIfNeeded();
+ updatePositions =
initWithPartitionOffsetsIfNeeded(initializingPartitions);
}
updatePositions.whenComplete((__, resetError) -> {
@@ -324,19 +325,21 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
}
/**
- * If there are partitions still needing a position and a reset policy is
defined, request reset using the
- * default policy.
+ * If there are partitions still needing a position and a reset policy is
defined, request reset using the default policy.
*
+ * @param initializingPartitions Set of partitions that should be
initialized. This won't reset positions for
+ * partitions that may have been added to
the subscription state, but that are not
+ * included in this set.
* @return Future that will complete when the reset operation completes
retrieving the offsets and setting
* positions in the subscription state using them.
* @throws NoOffsetForPartitionException If no reset strategy is
configured.
*/
- private CompletableFuture<Void> initWithPartitionOffsetsIfNeeded() {
+ private CompletableFuture<Void>
initWithPartitionOffsetsIfNeeded(Set<TopicPartition> initializingPartitions) {
CompletableFuture<Void> result = new CompletableFuture<>();
try {
// Mark partitions that need reset, using the configured reset
strategy. If no
// strategy is defined, this will raise a
NoOffsetForPartitionException exception.
- subscriptionState.resetInitializingPositions();
+
subscriptionState.resetInitializingPositions(initializingPartitions::contains);
} catch (Exception e) {
result.completeExceptionally(e);
return result;
@@ -351,11 +354,15 @@ public class OffsetsRequestManager implements
RequestManager, ClusterResourceLis
* Fetch the committed offsets for partitions that require initialization.
This will trigger an OffsetFetch
* request and update positions in the subscription state once a response
is received.
*
+ * @param initializingPartitions Set of partitions to update with a
position. This same set will be kept
+ * throughout the whole process (considered
when fetching committed offsets, and
+ * when resetting positions for partitions
that may not have committed offsets).
+ * @param deadlineMs Deadline of the application event that
triggered this operation. Used to
+ * determine how much time to allow for the
reused offset fetch to complete.
* @throws TimeoutException If offsets could not be retrieved within the
timeout
*/
- private CompletableFuture<Void> initWithCommittedOffsetsIfNeeded(long
deadlineMs) {
- final Set<TopicPartition> initializingPartitions =
subscriptionState.initializingPartitions();
-
+ private CompletableFuture<Void>
initWithCommittedOffsetsIfNeeded(Set<TopicPartition> initializingPartitions,
+ long
deadlineMs) {
if (initializingPartitions.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index bc7d20d11e0..b7d90431166 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -20,7 +20,6 @@ import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -32,7 +31,6 @@ import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -773,21 +771,17 @@ public class SubscriptionState {
}
/**
- * Note: this will not attempt to reset partitions that are in the process
of being assigned
- * and are pending the completion of any {@link
ConsumerRebalanceListener#onPartitionsAssigned(Collection)}
- * callbacks.
+ * Request reset for partitions that require a position, using the
configured reset strategy.
*
- * <p/>
- *
- * This method only appears to be invoked the by the {@link KafkaConsumer}
during its
- * {@link KafkaConsumer#poll(Duration)} logic. <em>Direct</em> calls to
methods like
- * {@link #requestOffsetReset(TopicPartition)}, {@link
#requestOffsetResetIfPartitionAssigned(TopicPartition)},
- * etc. do <em>not</em> skip partitions pending assignment.
+ * @param initPartitionsToInclude Initializing partitions to include in
the reset. Assigned partitions that
+ * require a positions but are not included
in this set won't be reset.
+ * @throws NoOffsetForPartitionException If there are partitions assigned
that require a position but
+ * there is no reset strategy
configured.
*/
- public synchronized void resetInitializingPositions() {
+ public synchronized void
resetInitializingPositions(Predicate<TopicPartition> initPartitionsToInclude) {
final Set<TopicPartition> partitionsWithNoOffsets = new HashSet<>();
assignment.forEach((tp, partitionState) -> {
- if (partitionState.shouldInitialize()) {
+ if (partitionState.shouldInitialize() &&
initPartitionsToInclude.test(tp)) {
if (defaultResetStrategy == OffsetResetStrategy.NONE)
partitionsWithNoOffsets.add(tp);
else
@@ -799,6 +793,10 @@ public class SubscriptionState {
throw new NoOffsetForPartitionException(partitionsWithNoOffsets);
}
+ public synchronized void resetInitializingPositions() {
+ resetInitializingPositions(tp -> true);
+ }
+
public synchronized Set<TopicPartition> partitionsNeedingReset(long nowMs)
{
return collectPartitions(state -> state.awaitingReset() &&
!state.awaitingRetryBackoff(nowMs));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
index 503878affa0..996ae05feb9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java
@@ -52,6 +52,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -76,6 +77,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -747,6 +749,46 @@ public class OffsetsRequestManagerTest {
verify(subscriptionState, never()).seekUnvalidated(any(), any());
}
+ // This test ensures that we don't reset positions to the partition
offsets for a partition assigned while the
+ // updateFetchPositions is running (after the OffsetFetch request has been
sent).
+ @Test
+ public void
testUpdatePositionsDoesNotResetPositionBeforeRetrievingOffsetsForNewlyAddedPartition()
{
+ long internalFetchCommittedTimeout = time.milliseconds() +
DEFAULT_API_TIMEOUT_MS;
+ TopicPartition tp1 = new TopicPartition("topic1", 1);
+ Set<TopicPartition> initPartitions1 = Collections.singleton(tp1);
+ Metadata.LeaderAndEpoch leaderAndEpoch = testLeaderEpoch(LEADER_1,
Optional.of(1));
+
+ // tp1 assigned and requires a position
+ mockAssignedPartitionsMissingPositions(initPartitions1,
initPartitions1, leaderAndEpoch);
+
+ // call to updateFetchPositions will trigger an OffsetFetch request
for tp1 (won't complete just yet)
+ CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult
= new CompletableFuture<>();
+ when(commitRequestManager.fetchOffsets(initPartitions1,
internalFetchCommittedTimeout)).thenReturn(fetchResult);
+ CompletableFuture<Boolean> updatePositions1 =
requestManager.updateFetchPositions(time.milliseconds());
+ assertFalse(updatePositions1.isDone());
+ verify(commitRequestManager).fetchOffsets(initPartitions1,
internalFetchCommittedTimeout);
+ clearInvocations(commitRequestManager);
+
+ // tp2 added to the assignment when the Offset Fetch request is
already sent including tp1 only
+ TopicPartition tp2 = new TopicPartition("topic2", 2);
+ Set<TopicPartition> initPartitions2 = new HashSet<>(Arrays.asList(tp1,
tp2));
+ mockAssignedPartitionsMissingPositions(initPartitions2,
initPartitions2, leaderAndEpoch);
+
+ // tp2 requires a position, but shouldn't be reset after receiving the
offset fetch response that will only
+ // include the requested partition tp1
+
when(subscriptionState.initializingPartitions()).thenReturn(initPartitions2);
+ OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(10,
Optional.empty(), "");
+ fetchResult.complete(Collections.singletonMap(tp1, offsetAndMetadata));
+
+ // Position should have been updated for tp1 using the committed offset
+ SubscriptionState.FetchPosition expectedPosition = new
SubscriptionState.FetchPosition(
+ offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(),
leaderAndEpoch);
+ verify(subscriptionState).seekUnvalidated(tp1, expectedPosition);
+
+ // Reset positions shouldn't include tp2
+ verify(subscriptionState).resetInitializingPositions(argThat(p ->
!p.test(tp2)));
+ }
+
@Test
public void testRemoteListOffsetsRequestTimeoutMs() {
int requestTimeoutMs = 100;