This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 2568432  KAFKA-9046: Use top-level worker configs for connector admin 
clients
2568432 is described below

commit 2568432233374402260186727a65f7cfb4432c38
Author: Chris Egerton <[email protected]>
AuthorDate: Thu Nov 14 14:09:04 2019 +0530

    KAFKA-9046: Use top-level worker configs for connector admin clients
    
    [Jira](https://issues.apache.org/jira/browse/KAFKA-9046)
    
    The changes here are meant to find a healthy compromise between the pre- 
and post-KIP-458 functionality of Connect workers when configuring admin 
clients for use with DLQs. Before KIP-458, admin clients were configured using 
the top-level worker configs; after KIP-458, they are configured using worker 
configs with a prefix of `admin.` and then optionally overridden by connector 
configs with a prefix of `admin.override.`. The behavior proposed here is to 
use, in ascending order of prec [...]
    
    Author: Chris Egerton <[email protected]>
    
    Reviewers: Konstantine Karantasis <[email protected]>, Randall Hauch 
<[email protected]>, Nigel Liang <[email protected]>
    
    Closes #7525 from C0urante/kafka-9046
    
    (cherry picked from commit 38d243b022336ecaf5cb400ae015c485f56ff978)
    Signed-off-by: Manikumar Reddy <[email protected]>
---
 .../java/org/apache/kafka/connect/runtime/Worker.java  | 18 ++++++++++++++++--
 .../org/apache/kafka/connect/runtime/WorkerTest.java   |  7 ++++---
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 0d5448e..3d4479c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -601,8 +602,21 @@ public class Worker {
                                             Class<? extends Connector> 
connectorClass,
                                             
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
         Map<String, Object> adminProps = new HashMap<>();
-        adminProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
-        // User-specified overrides
+        // Use the top-level worker configs to retain backwards compatibility 
with older releases which
+        // did not require a prefix for connector admin client configs in the 
worker configuration file
+        // Ignore configs that begin with "admin." since those will be added 
next (with the prefix stripped)
+        // and those that begin with "producer." and "consumer.", since we 
know they aren't intended for
+        // the admin client
+        Map<String, Object> nonPrefixedWorkerConfigs = 
config.originals().entrySet().stream()
+            .filter(e -> !e.getKey().startsWith("admin.")
+                && !e.getKey().startsWith("producer.")
+                && !e.getKey().startsWith("consumer."))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+            Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), 
","));
+        adminProps.putAll(nonPrefixedWorkerConfigs);
+
+        // Admin client-specific overrides in the worker config
         adminProps.putAll(config.originalsWithPrefix("admin."));
 
         // Connector-specified overrides
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 9cb83eb..40b4df2 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -968,12 +968,15 @@ public class WorkerTest extends ThreadedTest {
         Map<String, String> props = new HashMap<>(workerProps);
         props.put("admin.client.id", "testid");
         props.put("admin.metadata.max.age.ms", "5000");
+        props.put("producer.bootstrap.servers", "cbeauho.com");
+        props.put("consumer.bootstrap.servers", "localhost:4761");
         WorkerConfig configWithOverrides = new StandaloneConfig(props);
 
         Map<String, Object> connConfig = new HashMap<String, Object>();
         connConfig.put("metadata.max.age.ms", "10000");
 
-        Map<String, String> expectedConfigs = new HashMap<>();
+        Map<String, String> expectedConfigs = new HashMap<>(workerProps);
+
         expectedConfigs.put("bootstrap.servers", "localhost:9092");
         expectedConfigs.put("client.id", "testid");
         expectedConfigs.put("metadata.max.age.ms", "10000");
@@ -983,7 +986,6 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.replayAll();
         assertEquals(expectedConfigs, Worker.adminConfigs(new 
ConnectorTaskId("test", 1), configWithOverrides, connectorConfig,
                                                              null, 
allConnectorClientConfigOverridePolicy));
-
     }
 
     @Test(expected = ConnectException.class)
@@ -1001,7 +1003,6 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.replayAll();
         Worker.adminConfigs(new ConnectorTaskId("test", 1), 
configWithOverrides, connectorConfig,
                                                           null, 
noneConnectorClientConfigOverridePolicy);
-
     }
 
 

Reply via email to