This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 44f522d71 [INLONG-6706][Sort] Fix wrong kafka sink client id (#6711)
44f522d71 is described below
commit 44f522d71ca71c521db081310db136d59de4bd60
Author: vernedeng <[email protected]>
AuthorDate: Fri Dec 2 23:32:17 2022 +0800
[INLONG-6706][Sort] Fix wrong kafka sink client id (#6711)
---
.../inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index ba7118fcb..4fd09fddb 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -79,16 +79,15 @@ public class KafkaProducerCluster implements LifecycleAware
{
this.state = LifecycleState.START;
try {
Properties props = new Properties();
+ props.putAll(context.getParameters());
props.put(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG,
PartitionerSelector.class.getName()));
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
context.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
- props.put(
- ProducerConfig.CLIENT_ID_CONFIG,
- context.getString(ProducerConfig.CLIENT_ID_CONFIG) + "-" +
workerName);
- props.putAll(context.getParameters());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG,
+ context.getString(ProducerConfig.CLIENT_ID_CONFIG,
cacheClusterName) + "-" + workerName);
LOG.info("init kafka client info: " + props);
producer = new KafkaProducer<>(props, new StringSerializer(), new
ByteArraySerializer());
Preconditions.checkNotNull(producer);