C0urante commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1149844764


##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java:
##########
@@ -41,12 +41,12 @@
 import org.junit.jupiter.api.BeforeEach;
 
 /**
- * Tests MM2 replication and failover logic for {@link 
IdentityReplicationPolicy}.
+ * Tests MM2 replication and fail over logic for {@link 
IdentityReplicationPolicy}.
  *
  * <p>MM2 is configured with active/passive replication between two Kafka 
clusters with {@link IdentityReplicationPolicy}.
  * Tests validate that records sent to the primary cluster arrive at the 
backup cluster. Then, a consumer group is
  * migrated from the primary cluster to the backup cluster. Tests validate 
that consumer offsets
- * are translated and replicated from the primary cluster to the backup 
cluster during this failover.
+ * are translated and replicated from the primary cluster to the backup 
cluster during this fail over.

Review Comment:
   This should also be reverted



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -514,6 +543,41 @@ private void updateTopicConfigs(Map<String, Config> 
topicConfigs) {
         }));
     }
 
+    // visible for testing
+    void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+        Map<ConfigResource, Collection<AlterConfigOp>> configOps = new 
HashMap<>();
+        for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+            Collection<AlterConfigOp> ops = new ArrayList<>();
+            ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+            for (ConfigEntry config : topicConfig.getValue().entries()) {
+                if (config.isDefault() && 
!shouldReplicateSourceDefault(config.name())) {
+                    ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.DELETE));
+                } else {
+                    ops.add(new AlterConfigOp(config, 
AlterConfigOp.OpType.SET));
+                }
+            }
+            configOps.put(configResource, ops);
+        }
+        log.trace("Syncing configs for {} topics.", configOps.size());
+        
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> 
v.whenComplete((x, e) -> {
+            if (e != null) {
+                if 
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG)
+                        && e instanceof UnsupportedVersionException) {
+                    //Fallback logic
+                    log.warn("The target cluster {} is not compatible with 
IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for 
syncing topic configurations", sourceAndTarget.target(), e);

Review Comment:
   Nit: this line's pretty long, can we split it up?
   
   ```suggestion
                       log.warn(
                               "The target cluster {} is not compatible with 
IncrementalAlterConfigs API. "
                                       + "Therefore using deprecated 
AlterConfigs API for syncing topic configurations",
                               sourceAndTarget.target(),
                               e
                       );
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -365,11 +383,13 @@ void syncTopicAcls()
         updateTopicAcls(filteredBindings);
     }
 
-    private void syncTopicConfigs()
+    // visible for testing
+    void syncTopicConfigs()
             throws InterruptedException, ExecutionException {
+        boolean incremental = 
!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG);
         Map<String, Config> sourceConfigs = 
describeTopicConfigs(topicsBeingReplicated());
         Map<String, Config> targetConfigs = sourceConfigs.entrySet().stream()
-            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> 
targetConfig(x.getValue())));
+            .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> 
targetConfig(x.getValue(), incremental)));
         updateTopicConfigs(targetConfigs);

Review Comment:
   We should also use `incremental` to determine which kind of topic alter API 
we use in `updateTopicConfigs`.
   
   We can either do this by adding an `incremental` parameter to 
`updateTopicConfigs`, or by getting rid of that method entirely and just 
inlining it here.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -540,10 +601,13 @@ Map<String, Config> describeTopicConfigs(Set<String> 
topics)
 
     Config targetConfig(Config sourceConfig) {
         List<ConfigEntry> entries = sourceConfig.entries().stream()
-            .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive())
-            .filter(x -> x.source() != 
ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)
-            .filter(x -> shouldReplicateTopicConfigurationProperty(x.name()))
-            .collect(Collectors.toList());
+                    .filter(x -> 
(!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG)

Review Comment:
   This is looking better, but not quite--we still aren't handling the 
possibility that `useIncrementalAlterConfigs` changes in the middle of an 
invocation of `syncTopicConfigs`. (Left a more detailed comment in the review.)



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java:
##########
@@ -27,6 +27,13 @@ public interface ConfigPropertyFilter extends Configurable, 
AutoCloseable {
 
     boolean shouldReplicateConfigProperty(String prop);

Review Comment:
   (Depends on conversation on a different thread)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to