This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4bad90835b1 KAFKA-15465: Don't throw if MirrorMaker not authorized to
create internal topics. (#14388)
4bad90835b1 is described below
commit 4bad90835b1bb7a4fe63410e3ddbb11d0eec4a4d
Author: Omnia G.H Ibrahim <[email protected]>
AuthorDate: Fri Oct 13 11:53:09 2023 +0100
KAFKA-15465: Don't throw if MirrorMaker not authorized to create internal
topics. (#14388)
Reviewers: Mickael Maison <[email protected]>, Ahmed Hibot
---
.../apache/kafka/connect/mirror/MirrorUtils.java | 9 +++
.../kafka/connect/mirror/MirrorUtilsTest.java | 64 ++++++++++++++++++++--
2 files changed, 69 insertions(+), 4 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
index 9993a4331cc..20bd0c05228 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -287,11 +288,19 @@ public final class MirrorUtils {
log.debug("Unable to create topic '{}' since the brokers do
not support the CreateTopics API." +
" Falling back to assume topic exists or will
be auto-created by the broker.",
topicName);
+ return;
+ }
+ if (cause instanceof TopicAuthorizationException) {
+ log.debug("Not authorized to create topic(s) '{}' upon the
brokers." +
+ " Falling back to assume topic(s) exist or
will be auto-created by the broker.",
+ topicName);
+ return;
}
if (cause instanceof ClusterAuthorizationException) {
log.debug("Not authorized to create topic '{}'." +
" Falling back to assume topic exists or will
be auto-created by the broker.",
topicName);
+ return;
}
if (cause instanceof InvalidConfigurationException) {
throw new ConnectException("Unable to create topic '" +
topicName + "': " + cause.getMessage(),
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
index bf44afebf91..a73997ad57c 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorUtilsTest.java
@@ -20,7 +20,11 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.connect.errors.ConnectException;
import org.junit.jupiter.api.Test;
@@ -71,15 +75,67 @@ public class MirrorUtilsTest {
}
@Test
- public void testCreateCompactedTopicFails() throws Exception {
+ public void
testCreateCompactedTopicAssumeTopicAlreadyExistsWithUnsupportedVersionException()
throws Exception {
Map<String, KafkaFuture<Void>> values =
Collections.singletonMap(TOPIC, future);
- when(future.get()).thenThrow(new ExecutionException(new
ClusterAuthorizationException("not authorized")));
+ when(future.get()).thenThrow(new ExecutionException(new
UnsupportedVersionException("unsupported")));
+ when(ctr.values()).thenReturn(values);
+ when(admin.createTopics(any(), any())).thenReturn(ctr);
+ MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+ verify(future).get();
+ verify(ctr).values();
+ verify(admin).createTopics(any(), any());
+ }
+
+ @Test
+ public void
testCreateCompactedTopicAssumeTopicAlreadyExistsWithClusterAuthorizationException()
throws Exception {
+ Map<String, KafkaFuture<Void>> values =
Collections.singletonMap(TOPIC, future);
+ when(future.get()).thenThrow(new ExecutionException(new
ClusterAuthorizationException("not authorised")));
+ when(ctr.values()).thenReturn(values);
+ when(admin.createTopics(any(), any())).thenReturn(ctr);
+ MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+ verify(future).get();
+ verify(ctr).values();
+ verify(admin).createTopics(any(), any());
+ }
+
+ @Test
+ public void
testCreateCompactedTopicAssumeTopicAlreadyExistsWithTopicAuthorizationException()
throws Exception {
+ Map<String, KafkaFuture<Void>> values =
Collections.singletonMap(TOPIC, future);
+ when(future.get()).thenThrow(new ExecutionException(new
TopicAuthorizationException("not authorised")));
+ when(ctr.values()).thenReturn(values);
+ when(admin.createTopics(any(), any())).thenReturn(ctr);
+ MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin);
+
+ verify(future).get();
+ verify(ctr).values();
+ verify(admin).createTopics(any(), any());
+ }
+
+ @Test
+ public void
testCreateCompactedTopicFailsWithInvalidConfigurationException() throws
Exception {
+ Map<String, KafkaFuture<Void>> values =
Collections.singletonMap(TOPIC, future);
+ when(future.get()).thenThrow(new ExecutionException(new
InvalidConfigurationException("wrong config")));
+ when(ctr.values()).thenReturn(values);
+ when(admin.createTopics(any(), any())).thenReturn(ctr);
+ Throwable ce = assertThrows(ConnectException.class, () ->
MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin), "Should
have exception thrown");
+
+ assertTrue(ce.getCause() instanceof InvalidConfigurationException);
+ verify(future).get();
+ verify(ctr).values();
+ verify(admin).createTopics(any(), any());
+ }
+
+ @Test
+ public void testCreateCompactedTopicFailsWithTimeoutException() throws
Exception {
+ Map<String, KafkaFuture<Void>> values =
Collections.singletonMap(TOPIC, future);
+ when(future.get()).thenThrow(new ExecutionException(new
TimeoutException("Timeout")));
when(ctr.values()).thenReturn(values);
when(admin.createTopics(any(), any())).thenReturn(ctr);
Throwable ce = assertThrows(ConnectException.class, () ->
MirrorUtils.createCompactedTopic(TOPIC, (short) 1, (short) 1, admin), "Should
have exception thrown");
- assertTrue(ce.getCause() instanceof ExecutionException);
- assertTrue(ce.getCause().getCause() instanceof
ClusterAuthorizationException);
+ assertTrue(ce.getCause() instanceof TimeoutException);
verify(future).get();
verify(ctr).values();
verify(admin).createTopics(any(), any());