This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71fa008b456 KAFKA-14745: Cache the ReplicationPolicy instance in
MirrorConnectorConfig (#13328)
71fa008b456 is described below
commit 71fa008b4561628662cf84d895c5eb3eb5dba3d5
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Mar 3 12:14:17 2023 +0100
KAFKA-14745: Cache the ReplicationPolicy instance in MirrorConnectorConfig
(#13328)
Reviewers: Chris Egerton <[email protected]>
---
.../org/apache/kafka/connect/mirror/MirrorConnectorConfig.java | 5 ++++-
.../java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java | 4 ----
.../org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java | 7 +++++++
3 files changed, 11 insertions(+), 5 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
index 6896cf74157..0f158d3252d 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java
@@ -108,8 +108,11 @@ public abstract class MirrorConnectorConfig extends
AbstractConfig {
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DEFAULT =
SOURCE_CLUSTER_ALIAS_DEFAULT;
public static final String OFFSET_SYNCS_TOPIC_LOCATION_DOC = "The location
(source/target) of the offset-syncs topic.";
+ private final ReplicationPolicy replicationPolicy;
+
protected MirrorConnectorConfig(ConfigDef configDef, Map<String, String>
props) {
super(configDef, props, true);
+ replicationPolicy = getConfiguredInstance(REPLICATION_POLICY_CLASS,
ReplicationPolicy.class);
}
String connectorName() {
@@ -133,7 +136,7 @@ public abstract class MirrorConnectorConfig extends
AbstractConfig {
}
ReplicationPolicy replicationPolicy() {
- return getConfiguredInstance(REPLICATION_POLICY_CLASS,
ReplicationPolicy.class);
+ return replicationPolicy;
}
Map<String, Object> sourceProducerConfig() {
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
index 1bc2c8830cd..ff15357c248 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConfig.java
@@ -171,10 +171,6 @@ public class MirrorSourceConfig extends
MirrorConnectorConfig {
}
}
- ReplicationPolicy replicationPolicy() {
- return getConfiguredInstance(REPLICATION_POLICY_CLASS,
ReplicationPolicy.class);
- }
-
int replicationFactor() {
return getInt(REPLICATION_FACTOR);
}
diff --git
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
index dbf255d8bb4..7b5e9ffe292 100644
---
a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
+++
b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorConfigTest.java
@@ -29,6 +29,7 @@ import static
org.apache.kafka.connect.mirror.TestUtils.makeProps;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
public class MirrorConnectorConfigTest {
@@ -183,4 +184,10 @@ public class MirrorConnectorConfigTest {
assertEquals(2, config.metricsReporters().size());
}
+ @Test
+ public void testReplicationPolicy() {
+ MirrorConnectorConfig config = new
TestMirrorConnectorConfig(makeProps());
+ assertSame(config.replicationPolicy(), config.replicationPolicy());
+ }
+
}