This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new aacf0abc0 [Improve] [Connector-V2] Kafka client user configured 
clientid is preferred (#3783)
aacf0abc0 is described below

commit aacf0abc04abe9675ca4a30910f1befb63e852f1
Author: lightzhao <[email protected]>
AuthorDate: Sat Dec 24 10:49:48 2022 +0800

    [Improve] [Connector-V2] Kafka client user configured clientid is preferred 
(#3783)
    
    * If the clientid is configured on the user side, the clientid on the user 
side is preferred; otherwise, the default clientid is set.
    
    * format code style
    
    Co-authored-by: zhaoliang01 <[email protected]>
---
 .../seatunnel/kafka/source/KafkaConsumerThread.java         | 13 +++++++------
 .../seatunnel/kafka/source/KafkaSourceSplitEnumerator.java  |  7 ++++++-
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
index 5d2c40979..8ffc56b6d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
@@ -68,12 +68,13 @@ public class KafkaConsumerThread implements Runnable {
         properties.forEach((key, value) -> 
props.setProperty(String.valueOf(key), String.valueOf(value)));
         props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-consumer-" + this.hashCode());
-
-        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-            ByteArrayDeserializer.class.getName());
-        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-            ByteArrayDeserializer.class.getName());
+        if (this.metadata.getProperties().get("client.id") == null) {
+            props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
CLIENT_ID_PREFIX + "-consumer-" + this.hashCode());
+        } else {
+            props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
this.metadata.getProperties().get("client.id").toString());
+        }
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
         props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
String.valueOf(autoCommit));
 
         // Disable auto create topics feature
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 975d6b678..9ed383f67 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -210,7 +210,12 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
         Properties props = new Properties();
         props.putAll(properties);
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.metadata.getBootstrapServers());
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-admin-client-" + this.hashCode());
+        if (this.metadata.getProperties().get("client.id") == null) {
+            props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
CLIENT_ID_PREFIX + "-enumerator-admin-client-" + this.hashCode());
+        } else {
+            props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
this.metadata.getProperties().get("client.id").toString());
+        }
+
         return AdminClient.create(props);
     }
 

Reply via email to