mimaison commented on code in PR #12366:
URL: https://github.com/apache/kafka/pull/12366#discussion_r1037373964


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java:
##########
@@ -185,11 +193,52 @@ public ConfigDef config() {
         return MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
     }
 
+    @Override
+    public org.apache.kafka.common.config.Config validate(Map<String, String> 
props) {
+        List<ConfigValue> configValues = 
MirrorConnectorConfig.CONNECTOR_CONFIG_DEF.validate(props);
+        if ("required".equals(props.get(EXACTLY_ONCE_SUPPORT_CONFIG))) {
+            if (!consumerUsesReadCommitted(props)) {
+                ConfigValue exactlyOnceSupport = configValues.stream()
+                        .filter(cv -> 
EXACTLY_ONCE_SUPPORT_CONFIG.equals(cv.name()))
+                        .findAny()
+                        .orElseGet(() -> {
+                            ConfigValue result = new 
ConfigValue(EXACTLY_ONCE_SUPPORT_CONFIG);
+                            configValues.add(result);
+                            return result;
+                        });
+                // The Connect framework will already generate an error for 
this property if we return ExactlyOnceSupport.UNSUPPORTED
+                // from our exactlyOnceSupport method, but it will be fairly 
generic
+                // We add a second error message here to give users more 
insight into why this specific connector can't support exactly-once
+                // guarantees with the given configuration
+                exactlyOnceSupport.addErrorMessage(
+                        "Mirror Maker 2 can only provide exactly-once 
guarantees when its source consumer is configured with "
+                                + ConsumerConfig.ISOLATION_LEVEL_CONFIG + " 
set to '" + READ_COMMITTED + "'; "
+                                + "otherwise, records from aborted and 
uncommitted transactions will be replicated from the "
+                                + "source cluster to the target cluster."
+                );
+            }
+        }
+        return new org.apache.kafka.common.config.Config(configValues);
+    }
+
     @Override
     public String version() {
         return AppInfoParser.getVersion();
     }
 
+    @Override
+    public ExactlyOnceSupport exactlyOnceSupport(Map<String, String> props) {
+        return consumerUsesReadCommitted(props)
+                ? ExactlyOnceSupport.SUPPORTED
+                : ExactlyOnceSupport.UNSUPPORTED;
+    }
+
+    private boolean consumerUsesReadCommitted(Map<String, String> props) {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(props);
+        Object consumerIsolationLevel = 
config.sourceConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG);
+        return Objects.equals(READ_COMMITTED, consumerIsolationLevel);

Review Comment:
   Ah yes you're right! Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to