This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new aacf0abc0 [Improve] [Connector-V2] Kafka client user configured
clientid is preferred (#3783)
aacf0abc0 is described below
commit aacf0abc04abe9675ca4a30910f1befb63e852f1
Author: lightzhao <[email protected]>
AuthorDate: Sat Dec 24 10:49:48 2022 +0800
[Improve] [Connector-V2] Kafka client user configured clientid is preferred
(#3783)
* If the clientid is configured on the user side, the clientid on the user
side is preferred; otherwise, the default clientid is set.
* format code style
Co-authored-by: zhaoliang01 <[email protected]>
---
.../seatunnel/kafka/source/KafkaConsumerThread.java | 13 +++++++------
.../seatunnel/kafka/source/KafkaSourceSplitEnumerator.java | 7 ++++++-
2 files changed, 13 insertions(+), 7 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
index 5d2c40979..8ffc56b6d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
@@ -68,12 +68,13 @@ public class KafkaConsumerThread implements Runnable {
properties.forEach((key, value) ->
props.setProperty(String.valueOf(key), String.valueOf(value)));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-consumer-" + this.hashCode());
-
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
+ if (this.metadata.getProperties().get("client.id") == null) {
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
CLIENT_ID_PREFIX + "-consumer-" + this.hashCode());
+ } else {
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
this.metadata.getProperties().get("client.id").toString());
+ }
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(autoCommit));
// Disable auto create topics feature
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 975d6b678..9ed383f67 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -210,7 +210,12 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
Properties props = new Properties();
props.putAll(properties);
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.metadata.getBootstrapServers());
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-admin-client-" + this.hashCode());
+ if (this.metadata.getProperties().get("client.id") == null) {
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
CLIENT_ID_PREFIX + "-enumerator-admin-client-" + this.hashCode());
+ } else {
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
this.metadata.getProperties().get("client.id").toString());
+ }
+
return AdminClient.create(props);
}