This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new f7381e134e5 KAFKA-14781: Downgrade MM2 log message severity when no
ACL authorizer is configured on source broker (#13351)
f7381e134e5 is described below
commit f7381e134e5381d137956cafb10b167020e2f690
Author: Chris Egerton <[email protected]>
AuthorDate: Wed Mar 8 10:25:55 2023 -0500
KAFKA-14781: Downgrade MM2 log message severity when no ACL authorizer is
configured on source broker (#13351)
Reviewers: Mickael Maison <[email protected]>
---
build.gradle | 1 +
.../connect/mirror/MirrorSourceConnector.java | 44 ++++++++++++++++--
.../connect/mirror/MirrorSourceConnectorTest.java | 54 ++++++++++++++++++++++
3 files changed, 94 insertions(+), 5 deletions(-)
diff --git a/build.gradle b/build.gradle
index cacda19f74f..d5d9ffcbf38 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2835,6 +2835,7 @@ project(':connect:mirror') {
implementation libs.slf4jApi
testImplementation libs.junitJupiter
+ testImplementation libs.log4j
testImplementation libs.mockitoCore
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':connect:runtime').sourceSets.test.output
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
index b49daf0ff76..be79b51c8e1 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
import java.util.Map.Entry;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.common.config.ConfigDef;
@@ -47,10 +48,12 @@ import org.apache.kafka.clients.admin.CreateTopicsOptions;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
+import java.util.Optional;
import java.util.Set;
import java.util.HashSet;
import java.util.Collection;
import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -59,6 +62,8 @@ import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.kafka.connect.mirror.MirrorSourceConfig.SYNC_TOPIC_ACLS_ENABLED;
+
/** Replicate data, configuration, and ACLs between clusters.
*
* @see MirrorSourceConfig for supported config properties.
@@ -83,6 +88,7 @@ public class MirrorSourceConnector extends SourceConnector {
private Admin sourceAdminClient;
private Admin targetAdminClient;
private Admin offsetSyncsAdminClient;
+ private AtomicBoolean noAclAuthorizer = new AtomicBoolean(false);
public MirrorSourceConnector() {
// nop
@@ -103,6 +109,12 @@ public class MirrorSourceConnector extends SourceConnector
{
this.configPropertyFilter = configPropertyFilter;
}
+ // visible for testing
+ MirrorSourceConnector(Admin sourceAdminClient, Admin targetAdminClient) {
+ this.sourceAdminClient = sourceAdminClient;
+ this.targetAdminClient = targetAdminClient;
+ }
+
@Override
public void start(Map<String, String> props) {
long start = System.currentTimeMillis();
@@ -288,16 +300,20 @@ public class MirrorSourceConnector extends
SourceConnector {
.collect(Collectors.toSet());
}
- private void syncTopicAcls()
+ // Visible for testing
+ void syncTopicAcls()
throws InterruptedException, ExecutionException {
- List<AclBinding> bindings = listTopicAclBindings().stream()
+ Optional<Collection<AclBinding>> rawBindings = listTopicAclBindings();
+ if (!rawBindings.isPresent())
+ return;
+ List<AclBinding> filteredBindings = rawBindings.get().stream()
.filter(x -> x.pattern().resourceType() == ResourceType.TOPIC)
.filter(x -> x.pattern().patternType() == PatternType.LITERAL)
.filter(this::shouldReplicateAcl)
.filter(x -> shouldReplicateTopic(x.pattern().name()))
.map(this::targetAclBinding)
.collect(Collectors.toList());
- updateTopicAcls(bindings);
+ updateTopicAcls(filteredBindings);
}
private void syncTopicConfigs()
@@ -402,9 +418,27 @@ public class MirrorSourceConnector extends SourceConnector
{
return adminClient.listTopics().names().get();
}
- private Collection<AclBinding> listTopicAclBindings()
+ private Optional<Collection<AclBinding>> listTopicAclBindings()
throws InterruptedException, ExecutionException {
- return sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
+ Collection<AclBinding> bindings;
+ try {
+ bindings =
sourceAdminClient.describeAcls(ANY_TOPIC_ACL).values().get();
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof SecurityDisabledException) {
+ if (noAclAuthorizer.compareAndSet(false, true)) {
+ log.info(
+ "No ACL authorizer is configured on the source
Kafka cluster, so no topic ACL syncing will take place. "
+ + "Consider disabling topic ACL syncing by
setting " + SYNC_TOPIC_ACLS_ENABLED + " to 'false'."
+ );
+ } else {
+ log.debug("Source-side ACL authorizer still not found;
skipping topic ACL sync");
+ }
+ return Optional.empty();
+ } else {
+ throw e;
+ }
+ }
+ return Optional.of(bindings);
}
private static Collection<TopicDescription> describeTopics(Admin
adminClient, Collection<String> topics)
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
index 304b42d71c5..c037ebd90ed 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java
@@ -16,15 +16,20 @@
*/
package org.apache.kafka.connect.mirror;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeAclsResult;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
@@ -48,13 +53,17 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
public class MirrorSourceConnectorTest {
@@ -141,6 +150,51 @@ public class MirrorSourceConnectorTest {
assertEquals(processedDenyAllAclBinding.entry().permissionType(),
AclPermissionType.DENY, "should not change DENY");
}
+ @Test
+ public void testNoBrokerAclAuthorizer() throws Exception {
+ Admin sourceAdmin = mock(Admin.class);
+ Admin targetAdmin = mock(Admin.class);
+ MirrorSourceConnector connector = new
MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+ ExecutionException describeAclsFailure = new ExecutionException(
+ "Failed to describe ACLs",
+ new SecurityDisabledException("No ACL authorizer configured on
this broker")
+ );
+ @SuppressWarnings("unchecked")
+ KafkaFuture<Collection<AclBinding>> describeAclsFuture =
mock(KafkaFuture.class);
+ when(describeAclsFuture.get()).thenThrow(describeAclsFailure);
+ DescribeAclsResult describeAclsResult = mock(DescribeAclsResult.class);
+ when(describeAclsResult.values()).thenReturn(describeAclsFuture);
+ when(sourceAdmin.describeAcls(any())).thenReturn(describeAclsResult);
+
+ try (LogCaptureAppender connectorLogs =
LogCaptureAppender.createAndRegister(MirrorSourceConnector.class)) {
+
LogCaptureAppender.setClassLoggerToTrace(MirrorSourceConnector.class);
+ connector.syncTopicAcls();
+ long aclSyncDisableMessages = connectorLogs.getMessages().stream()
+ .filter(m -> m.contains("Consider disabling topic ACL
syncing"))
+ .count();
+ assertEquals(1, aclSyncDisableMessages, "Should have recommended
that user disable ACL syncing");
+ long aclSyncSkippingMessages = connectorLogs.getMessages().stream()
+ .filter(m -> m.contains("skipping topic ACL sync"))
+ .count();
+ assertEquals(0, aclSyncSkippingMessages, "Should not have logged
ACL sync skip at same time as suggesting ACL sync be disabled");
+
+ connector.syncTopicAcls();
+ connector.syncTopicAcls();
+ aclSyncDisableMessages = connectorLogs.getMessages().stream()
+ .filter(m -> m.contains("Consider disabling topic ACL
syncing"))
+ .count();
+ assertEquals(1, aclSyncDisableMessages, "Should not have
recommended that user disable ACL syncing more than once");
+ aclSyncSkippingMessages = connectorLogs.getMessages().stream()
+ .filter(m -> m.contains("skipping topic ACL sync"))
+ .count();
+ assertEquals(2, aclSyncSkippingMessages, "Should have logged ACL
sync skip instead of suggesting disabling ACL syncing");
+ }
+
+ // We should never have tried to perform an ACL sync on the target
cluster
+ verifyNoInteractions(targetAdmin);
+ }
+
@Test
public void testConfigPropertyFiltering() {
MirrorSourceConnector connector = new MirrorSourceConnector(new
SourceAndTarget("source", "target"),