C0urante commented on code in PR #13373: URL: https://github.com/apache/kafka/pull/13373#discussion_r1149844764
########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java: ########## @@ -41,12 +41,12 @@ import org.junit.jupiter.api.BeforeEach; /** - * Tests MM2 replication and failover logic for {@link IdentityReplicationPolicy}. + * Tests MM2 replication and fail over logic for {@link IdentityReplicationPolicy}. * * <p>MM2 is configured with active/passive replication between two Kafka clusters with {@link IdentityReplicationPolicy}. * Tests validate that records sent to the primary cluster arrive at the backup cluster. Then, a consumer group is * migrated from the primary cluster to the backup cluster. Tests validate that consumer offsets - * are translated and replicated from the primary cluster to the backup cluster during this failover. + * are translated and replicated from the primary cluster to the backup cluster during this fail over. Review Comment: This should also be reverted ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -514,6 +543,41 @@ private void updateTopicConfigs(Map<String, Config> topicConfigs) { })); } + // visible for testing + void incrementalAlterConfigs(Map<String, Config> topicConfigs) { + Map<ConfigResource, Collection<AlterConfigOp>> configOps = new HashMap<>(); + for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) { + Collection<AlterConfigOp> ops = new ArrayList<>(); + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey()); + for (ConfigEntry config : topicConfig.getValue().entries()) { + if (config.isDefault() && !shouldReplicateSourceDefault(config.name())) { + ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.DELETE)); + } else { + ops.add(new AlterConfigOp(config, AlterConfigOp.OpType.SET)); + } + } + configOps.put(configResource, ops); + } + log.trace("Syncing configs for {} topics.", configOps.size()); + targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) -> v.whenComplete((x, e) -> { + if (e != null) { + if (useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG) + && e instanceof UnsupportedVersionException) { + //Fallback logic + log.warn("The target cluster {} is not compatible with IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for syncing topic configurations", sourceAndTarget.target(), e); Review Comment: Nit: this line's pretty long, can we split it up? ```suggestion log.warn( "The target cluster {} is not compatible with IncrementalAlterConfigs API. " + "Therefore using deprecated AlterConfigs API for syncing topic configurations", sourceAndTarget.target(), e ); ``` ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -365,11 +383,13 @@ void syncTopicAcls() updateTopicAcls(filteredBindings); } - private void syncTopicConfigs() + // visible for testing + void syncTopicConfigs() throws InterruptedException, ExecutionException { + boolean incremental = !useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG); Map<String, Config> sourceConfigs = describeTopicConfigs(topicsBeingReplicated()); Map<String, Config> targetConfigs = sourceConfigs.entrySet().stream() - .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue()))); + .collect(Collectors.toMap(x -> formatRemoteTopic(x.getKey()), x -> targetConfig(x.getValue(), incremental))); updateTopicConfigs(targetConfigs); Review Comment: We should also use `incremental` to determine which kind of topic alter API we use in `updateTopicConfigs`. We can either do this by adding an `incremental` parameter to `updateTopicConfigs`, or by getting rid of that method entirely and just inlining it here. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java: ########## @@ -540,10 +601,13 @@ Map<String, Config> describeTopicConfigs(Set<String> topics) Config targetConfig(Config sourceConfig) { List<ConfigEntry> entries = sourceConfig.entries().stream() - .filter(x -> !x.isDefault() && !x.isReadOnly() && !x.isSensitive()) - .filter(x -> x.source() != ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG) - .filter(x -> shouldReplicateTopicConfigurationProperty(x.name())) - .collect(Collectors.toList()); + .filter(x -> (!useIncrementalAlterConfigs.equals(MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG) Review Comment: This is looking better, but not quite--we still aren't handling the possibility that `useIncrementalAlterConfigs` changes in the middle of an invocation of `syncTopicConfigs`. (Left a more detailed comment in the review.) ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/ConfigPropertyFilter.java: ########## @@ -27,6 +27,13 @@ public interface ConfigPropertyFilter extends Configurable, AutoCloseable { boolean shouldReplicateConfigProperty(String prop); Review Comment: (Depends on conversation on a different thread) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org