This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 44f522d71 [INLONG-6706][Sort] Fix wrong kafka sink client id (#6711)
44f522d71 is described below

commit 44f522d71ca71c521db081310db136d59de4bd60
Author: vernedeng <[email protected]>
AuthorDate: Fri Dec 2 23:32:17 2022 +0800

    [INLONG-6706][Sort] Fix wrong kafka sink client id (#6711)
---
 .../inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java    | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index ba7118fcb..4fd09fddb 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -79,16 +79,15 @@ public class KafkaProducerCluster implements LifecycleAware 
{
         this.state = LifecycleState.START;
         try {
             Properties props = new Properties();
+            props.putAll(context.getParameters());
             props.put(
                     ProducerConfig.PARTITIONER_CLASS_CONFIG,
                     context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
PartitionerSelector.class.getName()));
             props.put(
                     ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                     
context.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
-            props.put(
-                    ProducerConfig.CLIENT_ID_CONFIG,
-                    context.getString(ProducerConfig.CLIENT_ID_CONFIG) + "-" + 
workerName);
-            props.putAll(context.getParameters());
+            props.put(ProducerConfig.CLIENT_ID_CONFIG,
+                    context.getString(ProducerConfig.CLIENT_ID_CONFIG, 
cacheClusterName) + "-" + workerName);
             LOG.info("init kafka client info: " + props);
             producer = new KafkaProducer<>(props, new StringSerializer(), new 
ByteArraySerializer());
             Preconditions.checkNotNull(producer);

Reply via email to