This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ee3cea05aac KAFKA-16816: Remove unneeded FencedInstanceId support on
commit path for new consumer (#17559)
ee3cea05aac is described below
commit ee3cea05aac766c60cc858c33c7cd489ddcbdf9f
Author: TaiJuWu <[email protected]>
AuthorDate: Tue Nov 5 21:33:23 2024 +0800
KAFKA-16816: Remove unneeded FencedInstanceId support on commit path for
new consumer (#17559)
Reviewers: Lianet Magrans <[email protected]>
---
.../kafka/clients/consumer/KafkaConsumer.java | 21 ++++--
.../consumer/internals/AsyncKafkaConsumer.java | 26 -------
.../consumer/internals/CommitRequestManager.java | 5 --
.../consumer/internals/AsyncKafkaConsumerTest.java | 87 ----------------------
.../internals/CommitRequestManagerTest.java | 25 -------
5 files changed, 14 insertions(+), 150 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 1b12ae3d4b4..619659f3274 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -873,7 +873,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout
specified by {@code default.api.timeout.ms} expires
* before successful completion of the offset commit
- * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer instance gets fenced by broker.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer is using the classic group protocol
+ * and this instance gets fenced by broker.
*/
@Override
public void commitSync() {
@@ -916,7 +917,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout
expires before successful completion
* of the offset commit
- * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer instance gets fenced by broker.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer is using the classic group protocol
+ * and this instance gets fenced by broker.
*/
@Override
public void commitSync(Duration timeout) {
@@ -964,7 +966,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout
expires before successful completion
* of the offset commit
- * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer instance gets fenced by broker.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer is using the classic group protocol
+ * and this instance gets fenced by broker.
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata>
offsets) {
@@ -1012,7 +1015,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* is too large or if the topic does not exist).
* @throws org.apache.kafka.common.errors.TimeoutException if the timeout
expires before successful completion
* of the offset commit
- * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer instance gets fenced by broker.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer is using the classic group protocol
+ * and this instance gets fenced by broker.
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata>
offsets, final Duration timeout) {
@@ -1022,7 +1026,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
/**
* Commit offsets returned on the last {@link #poll(Duration)} for all the
subscribed list of topics and partition.
* Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
- * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer instance gets fenced by broker.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer is using the classic group protocol
+ * and this instance gets fenced by broker.
*/
@Override
public void commitAsync() {
@@ -1045,7 +1050,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* (and variants) returns.
*
* @param callback Callback to invoke when the commit completes
- * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer instance gets fenced by broker.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer is using the classic group protocol
+ * and this instance gets fenced by broker.
*/
@Override
public void commitAsync(OffsetCommitCallback callback) {
@@ -1072,7 +1078,8 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* @param offsets A map of offsets by partition with associate metadata.
This map will be copied internally, so it
* is safe to mutate the map after returning.
* @param callback Callback to invoke when the commit completes
- * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer instance gets fenced by broker.
+ * @throws org.apache.kafka.common.errors.FencedInstanceIdException if
this consumer is using the classic group protocol
+ * and this instance gets fenced by broker.
*/
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata>
offsets, OffsetCommitCallback callback) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index fccd69c86b8..f4787580361 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -75,7 +75,6 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
@@ -117,7 +116,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -249,7 +247,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
- private final AtomicBoolean asyncCommitFenced;
// Last triggered async commit future. Used to wait until all previous
async commits are completed.
// We only need to keep track of the last one, since they are guaranteed
to complete in order.
private CompletableFuture<Void> lastPendingAsyncCommit = null;
@@ -336,7 +333,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler);
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
- this.asyncCommitFenced = new AtomicBoolean(false);
this.groupMetadata.set(initializeGroupMetadata(config,
groupRebalanceConfig));
final Supplier<RequestManagers> requestManagersSupplier =
RequestManagers.supplier(time,
logContext,
@@ -448,7 +444,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
- this.asyncCommitFenced = new AtomicBoolean(false);
}
AsyncKafkaConsumer(LogContext logContext,
@@ -511,7 +506,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
backgroundEventHandler
);
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
- this.asyncCommitFenced = new AtomicBoolean(false);
Supplier<RequestManagers> requestManagersSupplier =
RequestManagers.supplier(
time,
logContext,
@@ -766,10 +760,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(offsets);
}
- if (t instanceof FencedInstanceIdException) {
- asyncCommitFenced.set(true);
- }
-
if (callback == null) {
if (t != null) {
log.error("Offset commit with offsets {} failed",
offsets, t);
@@ -786,7 +776,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private CompletableFuture<Void> commit(final CommitEvent commitEvent) {
maybeThrowInvalidGroupIdException();
- maybeThrowFencedInstanceException();
offsetCommitCallbackInvoker.executeCallbacks();
Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
@@ -1657,7 +1646,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
- maybeThrowFencedInstanceException();
offsetCommitCallbackInvoker.executeCallbacks();
try {
applicationEventHandler.addAndGet(new
UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
@@ -1940,20 +1928,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
return kafkaConsumerMetrics;
}
- private void maybeThrowFencedInstanceException() {
- if (asyncCommitFenced.get()) {
- String groupInstanceId = "unknown";
- if (!groupMetadata.get().isPresent()) {
- log.error("No group metadata found although a group ID was
provided. This is a bug!");
- } else if
(!groupMetadata.get().get().groupInstanceId().isPresent()) {
- log.error("No group instance ID found although the consumer is
fenced. This is a bug!");
- } else {
- groupInstanceId =
groupMetadata.get().get().groupInstanceId().get();
- }
- throw new FencedInstanceIdException("Get fenced exception for
group.instance.id " + groupInstanceId);
- }
- }
-
// Visible for testing
SubscriptionState subscriptions() {
return subscriptions;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 5a9e0455d29..dfa2520a02b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -735,11 +735,6 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
coordinatorRequestManager.markCoordinatorUnknown(error.message(),
currentTimeMs);
future.completeExceptionally(error.exception());
return;
- } else if (error == Errors.FENCED_INSTANCE_ID) {
- String fencedError = "OffsetCommit failed due to group
instance id fenced: " + groupInstanceId;
- log.error(fencedError);
- future.completeExceptionally(new
CommitFailedException(fencedError));
- return;
} else if (error == Errors.OFFSET_METADATA_TOO_LARGE ||
error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
future.completeExceptionally(error.exception());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index f9db6ba05da..380b7082b97 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -57,7 +57,6 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
@@ -334,23 +333,6 @@ public class AsyncKafkaConsumerTest {
new GroupAuthorizationException("Group authorization
exception"));
}
- @Test
- public void testCommitAsyncWithFencedException() {
- consumer = newConsumer();
- completeCommitSyncApplicationEventSuccessfully();
- final Map<TopicPartition, OffsetAndMetadata> offsets =
mockTopicPartitionOffset();
- MockCommitCallback callback = new MockCommitCallback();
-
- assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
-
- final ArgumentCaptor<AsyncCommitEvent> commitEventCaptor =
ArgumentCaptor.forClass(AsyncCommitEvent.class);
- verify(applicationEventHandler).add(commitEventCaptor.capture());
- final AsyncCommitEvent commitEvent = commitEventCaptor.getValue();
-
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
-
- assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () ->
consumer.commitAsync());
- }
-
@Test
public void testCommitted() {
time = new MockTime(1);
@@ -610,52 +592,6 @@ public class AsyncKafkaConsumerTest {
}
}
- @Test
- public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() {
- final String groupId = "consumerGroupA";
- final String groupInstanceId = "groupInstanceId1";
- final Properties props = requiredConsumerConfigAndGroupId(groupId);
- props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
- final ConsumerConfig config = new ConsumerConfig(props);
- consumer = newConsumer(config);
-
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
-
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
- final TopicPartition tp = new TopicPartition("foo", 0);
- completeAssignmentChangeEventSuccessfully();
- consumer.assign(Collections.singleton(tp));
- completeSeekUnvalidatedEventSuccessfully();
- consumer.seek(tp, 20);
-
- assertDoesNotThrow(() -> consumer.commitAsync());
-
- Exception e = assertThrows(FencedInstanceIdException.class, () ->
consumer.commitAsync());
- assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
- }
-
- @Test
- public void testCommitSyncTriggersFencedExceptionFromCommitAsync() {
- final String groupId = "consumerGroupA";
- final String groupInstanceId = "groupInstanceId1";
- final Properties props = requiredConsumerConfigAndGroupId(groupId);
- props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
- final ConsumerConfig config = new ConsumerConfig(props);
- consumer = newConsumer(config);
-
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
-
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
- final TopicPartition tp = new TopicPartition("foo", 0);
- completeAssignmentChangeEventSuccessfully();
- consumer.assign(Collections.singleton(tp));
- completeSeekUnvalidatedEventSuccessfully();
- consumer.seek(tp, 20);
-
- assertDoesNotThrow(() -> consumer.commitAsync());
-
- Exception e = assertThrows(FencedInstanceIdException.class, () ->
consumer.commitSync());
- assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
- }
-
@Test
public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
final TopicPartition tp = new TopicPartition("foo", 0);
@@ -739,29 +675,6 @@ public class AsyncKafkaConsumerTest {
return allValues.get(allValues.size() - 1);
}
- @Test
- public void testPollTriggersFencedExceptionFromCommitAsync() {
- final String groupId = "consumerGroupA";
- final String groupInstanceId = "groupInstanceId1";
- final Properties props = requiredConsumerConfigAndGroupId(groupId);
- props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
- final ConsumerConfig config = new ConsumerConfig(props);
- consumer = newConsumer(config);
-
completeCommitAsyncApplicationEventExceptionally(Errors.FENCED_INSTANCE_ID.exception());
-
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
-
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
- final TopicPartition tp = new TopicPartition("foo", 0);
- completeAssignmentChangeEventSuccessfully();
- consumer.assign(Collections.singleton(tp));
- completeSeekUnvalidatedEventSuccessfully();
- consumer.seek(tp, 20);
-
- assertDoesNotThrow(() -> consumer.commitAsync());
-
- Exception e = assertThrows(FencedInstanceIdException.class, () ->
consumer.poll(Duration.ZERO));
- assertEquals("Get fenced exception for group.instance.id
groupInstanceId1", e.getMessage());
- }
-
@Test
public void testEnsurePollExecutedCommitAsyncCallbacks() {
consumer = newConsumer();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 1a805e642fc..9ded26b9e71 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -340,27 +340,8 @@ public class CommitRequestManagerTest {
assertExceptionHandling(commitRequestManager, error, true);
}
- @ParameterizedTest
- @MethodSource("commitSyncExpectedExceptions")
- public void testCommitSyncFailsWithExpectedException(Errors commitError,
- Class<? extends
Exception> expectedException) {
- CommitRequestManager commitRequestManager = create(false, 100);
-
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
-
- Map<TopicPartition, OffsetAndMetadata> offsets =
Collections.singletonMap(
- new TopicPartition("topic", 1),
- new OffsetAndMetadata(0));
-
- // Send sync offset commit that fails and verify it propagates the
expected exception.
- long deadlineMs = time.milliseconds() + retryBackoffMs;
- CompletableFuture<Void> commitResult =
commitRequestManager.commitSync(offsets, deadlineMs);
- completeOffsetCommitRequestWithError(commitRequestManager,
commitError);
- assertFutureThrows(commitResult, expectedException);
- }
-
private static Stream<Arguments> commitSyncExpectedExceptions() {
return Stream.of(
- Arguments.of(Errors.FENCED_INSTANCE_ID,
CommitFailedException.class),
Arguments.of(Errors.UNKNOWN_MEMBER_ID,
CommitFailedException.class),
Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE,
Errors.OFFSET_METADATA_TOO_LARGE.exception().getClass()),
Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE,
Errors.INVALID_COMMIT_OFFSET_SIZE.exception().getClass()),
@@ -985,10 +966,6 @@ public class CommitRequestManagerTest {
case INVALID_COMMIT_OFFSET_SIZE:
assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
break;
- case FENCED_INSTANCE_ID:
- // This is a fatal failure, so we should not retry
- assertPollDoesNotReturn(commitRequestManager, Long.MAX_VALUE);
- break;
default:
if (errors.exception() instanceof RetriableException &&
requestShouldBeRetried) {
assertRetryBackOff(commitRequestManager, remainBackoffMs);
@@ -1279,7 +1256,6 @@ public class CommitRequestManagerTest {
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
Arguments.of(Errors.REQUEST_TIMED_OUT),
- Arguments.of(Errors.FENCED_INSTANCE_ID),
Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
Arguments.of(Errors.STALE_MEMBER_EPOCH),
Arguments.of(Errors.UNKNOWN_MEMBER_ID));
@@ -1299,7 +1275,6 @@ public class CommitRequestManagerTest {
Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
Arguments.of(Errors.REQUEST_TIMED_OUT),
- Arguments.of(Errors.FENCED_INSTANCE_ID),
Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
Arguments.of(Errors.UNKNOWN_MEMBER_ID),
// Adding STALE_MEMBER_EPOCH as non-retriable here because it is
only retried if a new