This is an automated email from the ASF dual-hosted git repository.

jlprat pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 41037bf78da KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test 
due to admin timeouts (#13575)
41037bf78da is described below

commit 41037bf78da3113eaf7f67b33ad7b00f4def930d
Author: Greg Harris <[email protected]>
AuthorDate: Fri Apr 21 12:55:41 2023 -0700

    KAFKA-14905: Reduce flakiness in MM2 ForwardingAdmin test due to admin 
timeouts (#13575)
    
    Reduce flakiness of 
`MirrorConnectorsWithCustomForwardingAdminIntegrationTest`
    
    Reviewers: Josep Prat <[email protected]>
---
 .../FakeForwardingAdminWithLocalMetadata.java      | 62 +++++++++-------------
 1 file changed, 24 insertions(+), 38 deletions(-)

diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
index 535dcaca9ee..3ac8a8b17f0 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/clients/admin/FakeForwardingAdminWithLocalMetadata.java
@@ -37,9 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /** Customised ForwardingAdmin for testing only.
  * The class create/alter topics, partitions and ACLs in Kafka then store 
metadata in {@link FakeLocalMetadataStore}.
@@ -47,7 +44,6 @@ import java.util.concurrent.TimeoutException;
 
 public class FakeForwardingAdminWithLocalMetadata extends ForwardingAdmin {
     private static final Logger log = 
LoggerFactory.getLogger(FakeForwardingAdminWithLocalMetadata.class);
-    private final long timeout = 1000L;
 
     public FakeForwardingAdminWithLocalMetadata(Map<String, Object> configs) {
         super(configs);
@@ -56,35 +52,29 @@ public class FakeForwardingAdminWithLocalMetadata extends 
ForwardingAdmin {
     @Override
     public CreateTopicsResult createTopics(Collection<NewTopic> newTopics, 
CreateTopicsOptions options) {
         CreateTopicsResult createTopicsResult = super.createTopics(newTopics, 
options);
-        newTopics.forEach(newTopic -> {
-            try {
-                log.info("Add topic '{}' to cluster and metadata store", 
newTopic);
-                // Wait for topic to be created before edit the fake local 
store
-                createTopicsResult.values().get(newTopic.name()).get(timeout, 
TimeUnit.MILLISECONDS);
+        newTopics.forEach(newTopic -> 
createTopicsResult.values().get(newTopic.name()).whenComplete((ignored, error) 
-> {
+            if (error == null) {
                 FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
-            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                if (e.getCause() instanceof TopicExistsException) {
-                    log.warn("Topic '{}' already exists. Update the local 
metadata store if absent", newTopic.name());
-                    
FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
-                } else
-                    log.error(e.getMessage());
+            } else if (error.getCause() instanceof TopicExistsException) {
+                log.warn("Topic '{}' already exists. Update the local metadata 
store if absent", newTopic.name());
+                FakeLocalMetadataStore.addTopicToLocalMetadataStore(newTopic);
+            } else {
+                log.error("Unable to intercept admin client operation", error);
             }
-        });
+        }));
         return createTopicsResult;
     }
 
     @Override
     public CreatePartitionsResult createPartitions(Map<String, NewPartitions> 
newPartitions, CreatePartitionsOptions options) {
         CreatePartitionsResult createPartitionsResult = 
super.createPartitions(newPartitions, options);
-        newPartitions.forEach((topic, newPartition) -> {
-            try {
-                // Wait for topic partition to be created before edit the fake 
local store
-                createPartitionsResult.values().get(topic).get(timeout, 
TimeUnit.MILLISECONDS);
+        newPartitions.forEach((topic, newPartition) -> 
createPartitionsResult.values().get(topic).whenComplete((ignored, error) -> {
+            if (error == null) {
                 FakeLocalMetadataStore.updatePartitionCount(topic, 
newPartition.totalCount());
-            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                log.error(e.getMessage());
+            } else {
+                log.error("Unable to intercept admin client operation", error);
             }
-        });
+        }));
         return createPartitionsResult;
     }
 
@@ -92,17 +82,15 @@ public class FakeForwardingAdminWithLocalMetadata extends 
ForwardingAdmin {
     @Override
     public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> 
configs, AlterConfigsOptions options) {
         AlterConfigsResult alterConfigsResult = super.alterConfigs(configs, 
options);
-        configs.forEach((configResource, newConfigs) -> {
-            try {
+        configs.forEach((configResource, newConfigs) -> 
alterConfigsResult.values().get(configResource).whenComplete((ignored, error) 
-> {
+            if (error == null) {
                 if (configResource.type() == ConfigResource.Type.TOPIC) {
-                    // Wait for config to be altered before edit the fake 
local store
-                    
alterConfigsResult.values().get(configResource).get(timeout, 
TimeUnit.MILLISECONDS);
                     
FakeLocalMetadataStore.updateTopicConfig(configResource.name(), newConfigs);
                 }
-            } catch (InterruptedException | ExecutionException | 
TimeoutException e) {
-                log.error(e.getMessage());
+            } else {
+                log.error("Unable to intercept admin client operation", error);
             }
-        });
+        }));
         return alterConfigsResult;
     }
 
@@ -110,15 +98,13 @@ public class FakeForwardingAdminWithLocalMetadata extends 
ForwardingAdmin {
     @Override
     public CreateAclsResult createAcls(Collection<AclBinding> acls, 
CreateAclsOptions options) {
         CreateAclsResult aclsResult = super.createAcls(acls, options);
-        try {
-            // Wait for acls to be created before edit the fake local store
-            aclsResult.all().get(timeout, TimeUnit.MILLISECONDS);
-            acls.forEach(aclBinding -> {
+        aclsResult.values().forEach((aclBinding, future) -> 
future.whenComplete((ignored, error) -> {
+            if (error == null) {
                 FakeLocalMetadataStore.addACLs(aclBinding.entry().principal(), 
aclBinding);
-            });
-        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
-            log.error(e.getMessage());
-        }
+            } else {
+                log.error("Unable to intercept admin client operation", error);
+            }
+        }));
         return aclsResult;
     }
 }

Reply via email to