C0urante commented on code in PR #13373:
URL: https://github.com/apache/kafka/pull/13373#discussion_r1149857584
##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -514,6 +543,41 @@ private void updateTopicConfigs(Map<String, Config>
topicConfigs) {
}));
}
+ // visible for testing
+ void incrementalAlterConfigs(Map<String, Config> topicConfigs) {
+ Map<ConfigResource, Collection<AlterConfigOp>> configOps = new
HashMap<>();
+ for (Map.Entry<String, Config> topicConfig : topicConfigs.entrySet()) {
+ Collection<AlterConfigOp> ops = new ArrayList<>();
+ ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, topicConfig.getKey());
+ for (ConfigEntry config : topicConfig.getValue().entries()) {
+ if (config.isDefault() &&
!shouldReplicateSourceDefault(config.name())) {
+ ops.add(new AlterConfigOp(config,
AlterConfigOp.OpType.DELETE));
+ } else {
+ ops.add(new AlterConfigOp(config,
AlterConfigOp.OpType.SET));
+ }
+ }
+ configOps.put(configResource, ops);
+ }
+ log.trace("Syncing configs for {} topics.", configOps.size());
+
targetAdminClient.incrementalAlterConfigs(configOps).values().forEach((k, v) ->
v.whenComplete((x, e) -> {
+ if (e != null) {
+ if
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUEST_INCREMENTAL_ALTER_CONFIG)
+ && e instanceof UnsupportedVersionException) {
+ //Fallback logic
+ log.warn("The target cluster {} is not compatible with
IncrementalAlterConfigs API. Therefore using deprecated AlterConfigs API for
syncing topic configurations", sourceAndTarget.target(), e);
+ useIncrementalAlterConfigs =
MirrorSourceConfig.NEVER_USE_INCREMENTAL_ALTER_CONFIG;
+ } else if
(useIncrementalAlterConfigs.equals(MirrorSourceConfig.REQUIRE_INCREMENTAL_ALTER_CONFIG)
+ && e instanceof UnsupportedVersionException) {
+ log.error("Failed to sync configs for topic {} on
cluster {} with IncrementalAlterConfigs API", k.name(),
sourceAndTarget.target(), e);
+ context.raiseError(new
ConnectException("use.incremental.alter.configs was set to \"required\", but
the target cluster '"
+ + sourceAndTarget.target() + "' is not
compatible with IncrementalAlterConfigs API", e));
Review Comment:
Oh, one more thing: the indentation is off here, which is causing Checkstyle
to fail the build:
```suggestion
log.error("Failed to sync configs for topic {} on
cluster {} with IncrementalAlterConfigs API", k.name(),
sourceAndTarget.target(), e);
context.raiseError(new
ConnectException("use.incremental.alter.configs was set to \"required\", but
the target cluster '"
+ sourceAndTarget.target() + "' is not
compatible with IncrementalAlterConfigs API", e));
```
Can you try to make sure that the project builds before pushing? `./gradlew
:connect:mirror:build` should do the trick, or if you want to skip integration
tests (since those can take a while) you can do `./gradlew
:connect:mirror:build :connect:mirror:unitTest -x test`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]