C0urante commented on code in PR #13913:
URL: https://github.com/apache/kafka/pull/13913#discussion_r1258611895
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -581,13 +582,23 @@ void incrementalAlterConfigs(Map<String, Config>
topicConfigs) {
}));
}
- private void updateTopicAcls(List<AclBinding> bindings) {
- log.trace("Syncing {} topic ACL bindings.", bindings.size());
- targetAdminClient.createAcls(bindings).values().forEach((k, v) ->
v.whenComplete((x, e) -> {
- if (e != null) {
- log.warn("Could not sync ACL of topic {}.",
k.pattern().name(), e);
- }
- }));
+ // Visible for testing
+ int updateTopicAcls(List<AclBinding> bindings) {
+ Set<AclBinding> addBindings = new HashSet<>(bindings);
+ addBindings.removeAll(knownTopicAclBindings);
+ int newBindCount = addBindings.size();
+ if (!addBindings.isEmpty()) {
+ log.info("Syncing new found {} topic ACL bindings.", newBindCount);
+ targetAdminClient.createAcls(addBindings).values().forEach((k, v)
-> v.whenComplete((x, e) -> {
+ if (e != null) {
+ log.warn("Could not sync ACL of topic {}.",
k.pattern().name(), e);
+ }
+ }));
+ knownTopicAclBindings = new HashSet<>(bindings);
Review Comment:
Hmmm... won't this cause issues if the call above to
`targetAdminClient::createAcls` fails?
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -683,4 +686,35 @@ private Optional<ConfigValue> validateProperty(String
name, Map<String, String>
assertNotNull(result, "Connector should not have record null config
value for '" + name + "' property");
return Optional.of(result);
}
+
+ @Test
+ public void testUpdateIncrementTopicAcls() {
+ Admin sourceAdmin = mock(Admin.class);
+ Admin targetAdmin = mock(Admin.class);
+ MirrorSourceConnector connector = new
MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+ List<AclBinding> filteredBindings = new ArrayList<>();
+ AclBinding binding1 = mock(AclBinding.class);
+ AclBinding binding2 = mock(AclBinding.class);
+ filteredBindings.add(binding1);
+ filteredBindings.add(binding2);
+
doReturn(mock(CreateAclsResult.class)).when(targetAdmin).createAcls(anySet());
+
+ // First topic acl info update when starting `syncTopicAcls` thread
+ int newAddCount = connector.updateTopicAcls(filteredBindings);
+ assertEquals(connector.knownTopicAclBindings(), new
HashSet<>(filteredBindings));
+ assertTrue(newAddCount == filteredBindings.size());
+
+ List<AclBinding> newAddBindings = new ArrayList<>();
+ AclBinding binding3 = mock(AclBinding.class);
+ AclBinding binding4 = mock(AclBinding.class);
+ newAddBindings.add(binding3);
+ newAddBindings.add(binding4);
+ filteredBindings.addAll(newAddBindings);
+
+ // The next increment topic acl info update
+ newAddCount = connector.updateTopicAcls(filteredBindings);
+ assertEquals(connector.knownTopicAclBindings(), new
HashSet<>(filteredBindings));
+ assertTrue(newAddCount == newAddBindings.size());
Review Comment:
```suggestion
assertEquals(newAddBindings.size(), newAddCount);
```
##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceConnectorTest.java:
##########
@@ -683,4 +686,35 @@ private Optional<ConfigValue> validateProperty(String
name, Map<String, String>
assertNotNull(result, "Connector should not have record null config
value for '" + name + "' property");
return Optional.of(result);
}
+
+ @Test
+ public void testUpdateIncrementTopicAcls() {
+ Admin sourceAdmin = mock(Admin.class);
+ Admin targetAdmin = mock(Admin.class);
+ MirrorSourceConnector connector = new
MirrorSourceConnector(sourceAdmin, targetAdmin);
+
+ List<AclBinding> filteredBindings = new ArrayList<>();
+ AclBinding binding1 = mock(AclBinding.class);
+ AclBinding binding2 = mock(AclBinding.class);
+ filteredBindings.add(binding1);
+ filteredBindings.add(binding2);
+
doReturn(mock(CreateAclsResult.class)).when(targetAdmin).createAcls(anySet());
+
+ // First topic acl info update when starting `syncTopicAcls` thread
+ int newAddCount = connector.updateTopicAcls(filteredBindings);
+ assertEquals(connector.knownTopicAclBindings(), new
HashSet<>(filteredBindings));
+ assertTrue(newAddCount == filteredBindings.size());
Review Comment:
```suggestion
assertEquals(filteredBindings.size(), newAddCount);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]