This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 130af384810 KAFKA-17223 Retrying the call after encoutering 
UnsupportedVersionException will cause ConcurrentModificationException (#16753)
130af384810 is described below

commit 130af384810aa1520b9d17710badf0a120d4d99c
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Aug 9 01:07:54 2024 +0800

    KAFKA-17223 Retrying the call after encoutering UnsupportedVersionException 
will cause ConcurrentModificationException (#16753)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 19 +++++++++---
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 36 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d6dea2c57a0..5df6565baab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1199,16 +1199,27 @@ public class KafkaAdminClient extends AdminClient {
             long pollTimeout = Long.MAX_VALUE;
             log.trace("Trying to choose nodes for {} at {}", pendingCalls, 
now);
 
-            Iterator<Call> pendingIter = pendingCalls.iterator();
-            while (pendingIter.hasNext()) {
-                Call call = pendingIter.next();
+            List<Call> toRemove = new ArrayList<>();
+            // Using pendingCalls.size() to get the list size before the 
for-loop to avoid infinite loop.
+            // If call.fail keeps adding the call to pendingCalls,
+            // the loop like for (int i = 0; i < pendingCalls.size(); i++) 
can't stop.
+            int pendingSize = pendingCalls.size();
+            // pendingCalls could be modified in this loop,
+            // hence using for-loop instead of iterator to avoid 
ConcurrentModificationException.
+            for (int i = 0; i < pendingSize; i++) {
+                Call call = pendingCalls.get(i);
                 // If the call is being retried, await the proper backoff 
before finding the node
                 if (now < call.nextAllowedTryMs) {
                     pollTimeout = Math.min(pollTimeout, call.nextAllowedTryMs 
- now);
                 } else if (maybeDrainPendingCall(call, now)) {
-                    pendingIter.remove();
+                    toRemove.add(call);
                 }
             }
+
+            // Use remove instead of removeAll to avoid delete all matched 
elements
+            for (Call call : toRemove) {
+                pendingCalls.remove(call);
+            }
             return pollTimeout;
         }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index bc4ccb52f8b..468fa12dd7f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
@@ -287,6 +288,10 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 
 /**
  * A unit test for KafkaAdminClient.
@@ -8174,6 +8179,37 @@ public class KafkaAdminClientTest {
         }
     }
 
+    @Test
+    public void 
testCallFailWithUnsupportedVersionExceptionDoesNotHaveConcurrentModificationException()
 throws InterruptedException {
+        try (AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+            AdminMetadataManager metadataManager = 
mock(AdminMetadataManager.class);
+
+            // first false result make sure 
LeastLoadedBrokerOrActiveKController#provide can go to requestUpdate
+            // second true result make sure 
LeastLoadedBrokerOrActiveKController#provide can get a node
+            doReturn(false).doReturn(true).when(metadataManager).isReady();
+
+            // make maybeDrainPendingCall throw UnsupportedVersionException 
and go to Call#fail
+            doThrow(new UnsupportedVersionException("Unsupported 
version")).doNothing().when(metadataManager).requestUpdate();
+
+            // make sure describeCluster handleUnsupportedVersionException 
doesn't always return false
+            doReturn(false).when(metadataManager).usingBootstrapControllers();
+            // avoid sending fetchMetadata request
+            doReturn(1L).when(metadataManager).metadataFetchDelayMs(anyLong());
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+            try (KafkaAdminClient admin = KafkaAdminClient.createInternal(
+                    new AdminClientConfig(Collections.emptyMap()), 
metadataManager, env.kafkaClient(), env.time())) {
+                admin.describeCluster(new 
DescribeClusterOptions().timeoutMs(1000));
+
+                // make sure maybeDrainPendingCalls doesn't remove duplicate 
pending calls
+                // the listNodes call will be added again in call.fail and 
remove one in maybeDrainPendingCalls
+                TestUtils.waitForCondition(() -> 
env.kafkaClient().inFlightRequestCount() != 0,
+                        "Timed out waiting for listNodes request");
+            }
+        }
+    }
+
     private static ListClientMetricsResourcesResponse 
prepareListClientMetricsResourcesResponse(Errors error) {
         return new ListClientMetricsResourcesResponse(new 
ListClientMetricsResourcesResponseData()
                 .setErrorCode(error.code()));

Reply via email to