This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4bad90835b1 KAFKA-15465: Don't throw if MirrorMaker not authorized to 
create internal topics. (#14388)
4bad90835b1 is described below

commit 4bad90835b1bb7a4fe63410e3ddbb11d0eec4a4d
Author: Omnia G.H Ibrahim <[email protected]>
AuthorDate: Fri Oct 13 11:53:09 2023 +0100

    KAFKA-15465: Don't throw if MirrorMaker not authorized to create internal 
topics. (#14388)
    
    
    Reviewers: Mickael Maison <[email protected]>, Ahmed Hibot
---
 .../apache/kafka/connect/mirror/MirrorUtils.java   |  9 +++
 .../kafka/connect/mirror/MirrorUtilsTest.java      | 64 ++++++++++++++++++++--
 2 files changed, 69 insertions(+), 4 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
index 9993a4331cc..20bd0c05228 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -287,11 +288,19 @@ public final class MirrorUtils {
                 log.debug("Unable to create topic '{}' since the brokers do 
not support the CreateTopics API." +
                                 " Falling back to assume topic exists or will 
be auto-created by the broker.",
                         topicName);
+                return;
+            }
+            if (cause instanceof TopicAuthorizationException) {
+                log.debug("Not authorized to create topic(s) '{}' upon the 
brokers." +
+                                " Falling back to assume topic(s) exist or 
will be auto-created by the broker.",
+                        topicName);
+                return;
             }
             if (cause instanceof ClusterAuthorizationException) {
                 log.debug("Not authorized to create topic '{}'." +
                                 " Falling back to assume topic exists or will 
be auto-created by the broker.",
                         topicName);
+                return;
             }
             if (cause instanceof InvalidConfigurationException) {
                 throw new ConnectException("Unable to create topic '" + 
topicName + "': " + cause.getMessage(),
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
index bf44afebf91..a73997ad57c 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
@@ -20,7 +20,11 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.CreateTopicsResult;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.junit.jupiter.api.Test;
 
@@ -71,15 +75,67 @@ public class MirrorUtilsTest {
     }
 
     @Test
-    public void testCreateCompactedTopicFails() throws Exception {
+    public void 
testCreateCompactedTopicAssumeTopicAlreadyExistsWithUnsupportedVersionException()
 throws Exception {
         Map<String, KafkaFuture<Void>> values = 
Collections.singletonMap(TOPIC, future);
-        when(future.get()).thenThrow(new ExecutionException(new 
ClusterAuthorizationException("not authorized")));
+        when(future.get()).thenThrow(new ExecutionException(new 
UnsupportedVersionException("unsupported")));
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+
+    @Test
+    public void 
testCreateCompactedTopicAssumeTopicAlreadyExistsWithClusterAuthorizationException()
 throws Exception {
+        Map<String, KafkaFuture<Void>> values = 
Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenThrow(new ExecutionException(new 
ClusterAuthorizationException("not authorised")));
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+
+    @Test
+    public void 
testCreateCompactedTopicAssumeTopicAlreadyExistsWithTopicAuthorizationException()
 throws Exception {
+        Map<String, KafkaFuture<Void>> values = 
Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenThrow(new ExecutionException(new 
TopicAuthorizationException("not authorised")));
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+
+    @Test
+    public void 
testCreateCompactedTopicFailsWithInvalidConfigurationException() throws 
Exception {
+        Map<String, KafkaFuture<Void>> values = 
Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenThrow(new ExecutionException(new 
InvalidConfigurationException("wrong config")));
+        when(ctr.values()).thenReturn(values);
+        when(admin.createTopics(any(), any())).thenReturn(ctr);
+        Throwable ce = assertThrows(ConnectException.class, () -> 
MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin), "Should 
have exception thrown");
+
+        assertTrue(ce.getCause() instanceof InvalidConfigurationException);
+        verify(future).get();
+        verify(ctr).values();
+        verify(admin).createTopics(any(), any());
+    }
+
+    @Test
+    public void testCreateCompactedTopicFailsWithTimeoutException() throws 
Exception {
+        Map<String, KafkaFuture<Void>> values = 
Collections.singletonMap(TOPIC, future);
+        when(future.get()).thenThrow(new ExecutionException(new 
TimeoutException("Timeout")));
         when(ctr.values()).thenReturn(values);
         when(admin.createTopics(any(), any())).thenReturn(ctr);
         Throwable ce = assertThrows(ConnectException.class, () -> 
MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin), "Should 
have exception thrown");
 
-        assertTrue(ce.getCause() instanceof ExecutionException);
-        assertTrue(ce.getCause().getCause() instanceof 
ClusterAuthorizationException);
+        assertTrue(ce.getCause() instanceof TimeoutException);
         verify(future).get();
         verify(ctr).values();
         verify(admin).createTopics(any(), any());

Reply via email to