This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new f7381e134e5 KAFKA-14781: Downgrade MM2 log message severity when no 
ACL authorizer is configured on source broker (#13351)
f7381e134e5 is described below

commit f7381e134e5381d137956cafb10b167020e2f690
Author: Chris Egerton <[email protected]>
AuthorDate: Wed Mar 8 10:25:55 2023 -0500

    KAFKA-14781: Downgrade MM2 log message severity when no ACL authorizer is 
configured on source broker (#13351)
    
    Reviewers: Mickael Maison <[email protected]>
---
 build.gradle                                       |  1 +
 .../connect/mirror/MirrorSourceConnector.java      | 44 ++++++++++++++++--
 .../connect/mirror/MirrorSourceConnectorTest.java  | 54 ++++++++++++++++++++++
 3 files changed, 94 insertions(+), 5 deletions(-)

diff --git a/build.gradle b/build.gradle
index cacda19f74f..d5d9ffcbf38 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2835,6 +2835,7 @@ project(':connect:mirror') {
     implementation libs.slf4jApi
 
     testImplementation libs.junitJupiter
+    testImplementation libs.log4j
     testImplementation libs.mockitoCore
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':connect:runtime').sourceSets.test.output
diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index b49daf0ff76..be79b51c8e1 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
 import java.util.Map.Entry;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.common.config.ConfigDef;
@@ -47,10 +48,12 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions;
 import java.util.Map;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Optional;
 import java.util.Set;
 import java.util.HashSet;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -59,6 +62,8 @@ import java.util.concurrent.ExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED;
+
 /** Replicate data, configuration, and ACLs between clusters.
  *
  *  @see MirrorSourceConfig for supported config properties.
@@ -83,6 +88,7 @@ public class MirrorSourceConnector extends SourceConnector {
     private Admin sourceAdminClient;
     private Admin targetAdminClient;
     private Admin offsetSyncsAdminClient;
+    private AtomicBoolean noAclAuthorizer = new AtomicBoolean(false);
 
     public MirrorSourceConnector() {
         // nop
@@ -103,6 +109,12 @@ public class MirrorSourceConnector extends SourceConnector 
{
         this.configPropertyFilter = configPropertyFilter;
     }
 
+    // visible for testing
+    MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient) {
+        this.sourceAdminClient = sourceAdminClient;
+        this.targetAdminClient = targetAdminClient;
+    }
+
     @Override
     public void start(Map<String, String> props) {
         long start = System.currentTimeMillis();
@@ -288,16 +300,20 @@ public class MirrorSourceConnector extends 
SourceConnector {
                 .collect(Collectors.toSet());
     }
 
-    private void syncTopicAcls()
+    // Visible for testing
+    void syncTopicAcls()
             throws InterruptedException, ExecutionException {
-        List<AclBinding> bindings = listTopicAclBindings().stream()
+        Optional<Collection<AclBinding>> rawBindings = listTopicAclBindings();
+        if (!rawBindings.isPresent())
+            return;
+        List<AclBinding> filteredBindings = rawBindings.get().stream()
             .filter(x -> x.pattern().resourceType() == ResourceType.TOPIC)
             .filter(x -> x.pattern().patternType() == PatternType.LITERAL)
             .filter(this::shouldReplicateAcl)
             .filter(x -> shouldReplicateTopic(x.pattern().name()))
             .map(this::targetAclBinding)
             .collect(Collectors.toList());
-        updateTopicAcls(bindings);
+        updateTopicAcls(filteredBindings);
     }
 
     private void syncTopicConfigs()
@@ -402,9 +418,27 @@ public class MirrorSourceConnector extends SourceConnector 
{
         return adminClient.listTopics().names().get();
     }
 
-    private Collection<AclBinding> listTopicAclBindings()
+    private Optional<Collection<AclBinding>> listTopicAclBindings()
             throws InterruptedException, ExecutionException {
-        return sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
+        Collection<AclBinding> bindings;
+        try {
+            bindings = 
sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof SecurityDisabledException) {
+                if (noAclAuthorizer.compareAndSet(false, true)) {
+                    log.info(
+                            "No ACL authorizer is configured on the source 
Kafka cluster, so no topic ACL syncing will take place. "
+                                    + "Consider disabling topic ACL syncing by 
setting " + SYNC_TOPIC_ACLS_ENABLED + " to 'false'."
+                    );
+                } else {
+                    log.debug("Source-side ACL authorizer still not found; 
skipping topic ACL sync");
+                }
+                return Optional.empty();
+            } else {
+                throw e;
+            }
+        }
+        return Optional.of(bindings);
     }
 
     private static Collection<TopicDescription> describeTopics(Admin 
adminClient, Collection<String> topics)
diff --git 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 304b42d71c5..c037ebd90ed 100644
--- 
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++ 
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -16,15 +16,20 @@
  */
 package org.apache.kafka.connect.mirror;
 
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeAclsResult;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
 import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.clients.admin.ConfigEntry;
 import org.apache.kafka.clients.admin.NewTopic;
@@ -48,13 +53,17 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 public class MirrorSourceConnectorTest {
 
@@ -141,6 +150,51 @@ public class MirrorSourceConnectorTest {
         assertEquals(processedDenyAllAclBinding.entry().permissionType(), 
AclPermissionType.DENY, "should not change DENY");
     }
 
+    @Test
+    public void testNoBrokerAclAuthorizer() throws Exception {
+        Admin sourceAdmin = mock(Admin.class);
+        Admin targetAdmin = mock(Admin.class);
+        MirrorSourceConnector connector = new 
MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+        ExecutionException describeAclsFailure = new ExecutionException(
+                "Failed to describe ACLs",
+                new SecurityDisabledException("No ACL authorizer configured on 
this broker")
+        );
+        @SuppressWarnings("unchecked")
+        KafkaFuture<Collection<AclBinding>> describeAclsFuture = 
mock(KafkaFuture.class);
+        when(describeAclsFuture.get()).thenThrow(describeAclsFailure);
+        DescribeAclsResult describeAclsResult = mock(DescribeAclsResult.class);
+        when(describeAclsResult.values()).thenReturn(describeAclsFuture);
+        when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult);
+
+        try (LogCaptureAppender connectorLogs = 
LogCaptureAppender.createAndRegister(MirrorSourceConnector.class)) {
+            
LogCaptureAppender.setClassLoggerToTrace(MirrorSourceConnector.class);
+            connector.syncTopicAcls();
+            long aclSyncDisableMessages = connectorLogs.getMessages().stream()
+                    .filter(m -> m.contains("Consider disabling topic ACL 
syncing"))
+                    .count();
+            assertEquals(1, aclSyncDisableMessages, "Should have recommended 
that user disable ACL syncing");
+            long aclSyncSkippingMessages = connectorLogs.getMessages().stream()
+                    .filter(m -> m.contains("skipping topic ACL sync"))
+                    .count();
+            assertEquals(0, aclSyncSkippingMessages, "Should not have logged 
ACL sync skip at same time as suggesting ACL sync be disabled");
+
+            connector.syncTopicAcls();
+            connector.syncTopicAcls();
+            aclSyncDisableMessages = connectorLogs.getMessages().stream()
+                    .filter(m -> m.contains("Consider disabling topic ACL 
syncing"))
+                    .count();
+            assertEquals(1, aclSyncDisableMessages, "Should not have 
recommended that user disable ACL syncing more than once");
+            aclSyncSkippingMessages = connectorLogs.getMessages().stream()
+                    .filter(m -> m.contains("skipping topic ACL sync"))
+                    .count();
+            assertEquals(2, aclSyncSkippingMessages, "Should have logged ACL 
sync skip instead of suggesting disabling ACL syncing");
+        }
+
+        // We should never have tried to perform an ACL sync on the target 
cluster
+        verifyNoInteractions(targetAdmin);
+    }
+
     @Test
     public void testConfigPropertyFiltering() {
         MirrorSourceConnector connector = new MirrorSourceConnector(new 
SourceAndTarget("source", "target"),

Reply via email to