This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 130af384810 KAFKA-17223 Retrying the call after encoutering
UnsupportedVersionException will cause ConcurrentModificationException (#16753)
130af384810 is described below
commit 130af384810aa1520b9d17710badf0a120d4d99c
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Aug 9 01:07:54 2024 +0800
KAFKA-17223 Retrying the call after encoutering UnsupportedVersionException
will cause ConcurrentModificationException (#16753)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 19 +++++++++---
.../kafka/clients/admin/KafkaAdminClientTest.java | 36 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d6dea2c57a0..5df6565baab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1199,16 +1199,27 @@ public class KafkaAdminClient extends AdminClient {
long pollTimeout = Long.MAX_VALUE;
log.trace("Trying to choose nodes for {} at {}", pendingCalls,
now);
- Iterator<Call> pendingIter = pendingCalls.iterator();
- while (pendingIter.hasNext()) {
- Call call = pendingIter.next();
+ List<Call> toRemove = new ArrayList<>();
+ // Using pendingCalls.size() to get the list size before the
for-loop to avoid infinite loop.
+ // If call.fail keeps adding the call to pendingCalls,
+ // the loop like for (int i = 0; i < pendingCalls.size(); i++)
can't stop.
+ int pendingSize = pendingCalls.size();
+ // pendingCalls could be modified in this loop,
+ // hence using for-loop instead of iterator to avoid
ConcurrentModificationException.
+ for (int i = 0; i < pendingSize; i++) {
+ Call call = pendingCalls.get(i);
// If the call is being retried, await the proper backoff
before finding the node
if (now < call.nextAllowedTryMs) {
pollTimeout = Math.min(pollTimeout, call.nextAllowedTryMs
- now);
} else if (maybeDrainPendingCall(call, now)) {
- pendingIter.remove();
+ toRemove.add(call);
}
}
+
+ // Use remove instead of removeAll to avoid delete all matched
elements
+ for (Call call : toRemove) {
+ pendingCalls.remove(call);
+ }
return pollTimeout;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index bc4ccb52f8b..468fa12dd7f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@@ -287,6 +288,10 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
/**
* A unit test for KafkaAdminClient.
@@ -8174,6 +8179,37 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void
testCallFailWithUnsupportedVersionExceptionDoesNotHaveConcurrentModificationException()
throws InterruptedException {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ AdminMetadataManager metadataManager =
mock(AdminMetadataManager.class);
+
+ // first false result make sure
LeastLoadedBrokerOrActiveKController#provide can go to requestUpdate
+ // second true result make sure
LeastLoadedBrokerOrActiveKController#provide can get a node
+ doReturn(false).doReturn(true).when(metadataManager).isReady();
+
+ // make maybeDrainPendingCall throw UnsupportedVersionException
and go to Call#fail
+ doThrow(new UnsupportedVersionException("Unsupported
version")).doNothing().when(metadataManager).requestUpdate();
+
+ // make sure describeCluster handleUnsupportedVersionException
doesn't always return false
+ doReturn(false).when(metadataManager).usingBootstrapControllers();
+ // avoid sending fetchMetadata request
+ doReturn(1L).when(metadataManager).metadataFetchDelayMs(anyLong());
+
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ try (KafkaAdminClient admin = KafkaAdminClient.createInternal(
+ new AdminClientConfig(Collections.emptyMap()),
metadataManager, env.kafkaClient(), env.time())) {
+ admin.describeCluster(new
DescribeClusterOptions().timeoutMs(1000));
+
+ // make sure maybeDrainPendingCalls doesn't remove duplicate
pending calls
+ // the listNodes call will be added again in call.fail and
remove one in maybeDrainPendingCalls
+ TestUtils.waitForCondition(() ->
env.kafkaClient().inFlightRequestCount() != 0,
+ "Timed out waiting for listNodes request");
+ }
+ }
+ }
+
private static ListClientMetricsResourcesResponse
prepareListClientMetricsResourcesResponse(Errors error) {
return new ListClientMetricsResourcesResponse(new
ListClientMetricsResourcesResponseData()
.setErrorCode(error.code()));