This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a331f1c86 [INLONG-10230][Sort] KafkaSink support unified 
configuration (#10235)
5a331f1c86 is described below

commit 5a331f1c86f078f2995c0f95d58509dd70868c01
Author: vernedeng <[email protected]>
AuthorDate: Fri May 17 19:56:28 2024 +0800

    [INLONG-10230][Sort] KafkaSink support unified configuration (#10235)
---
 .../sink/{SinkConfig.java => KafkaSinkConfig.java} |  21 +--
 .../common/pojo/sort/dataflow/sink/SinkConfig.java |   1 +
 .../node/{NodeConfig.java => KafkaNodeConfig.java} |  21 +--
 .../inlong/common/pojo/sort/node/NodeConfig.java   |   1 +
 .../manager/pojo/node/kafka/KafkaDataNodeDTO.java  |   2 +-
 .../manager/pojo/node/kafka/KafkaDataNodeInfo.java |   2 +-
 .../service/node/kafka/KafkaDataNodeOperator.java  |  10 ++
 .../service/sink/kafka/KafkaSinkOperator.java      |  22 ++++
 .../sink/kafka/KafkaFederationSinkContext.java     |  70 ++++------
 .../sort/standalone/sink/kafka/KafkaIdConfig.java  | 141 +++------------------
 .../sink/kafka/KafkaProducerCluster.java           |  27 ++--
 .../sink/kafka/KafkaProducerFederation.java        |  99 ++++-----------
 12 files changed, 125 insertions(+), 292 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
similarity index 52%
copy from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
copy to 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
index 06635d219f..f0ec2bd6b1 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
@@ -17,25 +17,12 @@
 
 package org.apache.inlong.common.pojo.sort.dataflow.sink;
 
-import org.apache.inlong.common.constant.SinkType;
-import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
-
 import lombok.Data;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import java.io.Serializable;
-import java.util.List;
+import lombok.EqualsAndHashCode;
 
+@EqualsAndHashCode(callSuper = true)
 @Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes({
-        @JsonSubTypes.Type(value = ClsSinkConfig.class, name = SinkType.CLS),
-        @JsonSubTypes.Type(value = EsSinkConfig.class, name = 
SinkType.ELASTICSEARCH),
-        @JsonSubTypes.Type(value = PulsarSinkConfig.class, name = 
SinkType.PULSAR),
-})
-public abstract class SinkConfig implements Serializable {
+public class KafkaSinkConfig extends SinkConfig {
 
-    private String encodingType;
-    private List<FieldConfig> fieldConfigs;
+    private String topicName;
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
index 06635d219f..aa0d37ba3f 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
@@ -33,6 +33,7 @@ import java.util.List;
         @JsonSubTypes.Type(value = ClsSinkConfig.class, name = SinkType.CLS),
         @JsonSubTypes.Type(value = EsSinkConfig.class, name = 
SinkType.ELASTICSEARCH),
         @JsonSubTypes.Type(value = PulsarSinkConfig.class, name = 
SinkType.PULSAR),
+        @JsonSubTypes.Type(value = KafkaSinkConfig.class, name = 
SinkType.KAFKA),
 })
 public abstract class SinkConfig implements Serializable {
 
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/KafkaNodeConfig.java
similarity index 52%
copy from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
copy to 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/KafkaNodeConfig.java
index 1c82d4c9e2..9bafb3d85b 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/KafkaNodeConfig.java
@@ -17,25 +17,12 @@
 
 package org.apache.inlong.common.pojo.sort.node;
 
-import org.apache.inlong.common.constant.DataNodeType;
-
 import lombok.Data;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import java.io.Serializable;
-import java.util.Map;
 
 @Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes({
-        @JsonSubTypes.Type(value = ClsNodeConfig.class, name = 
DataNodeType.CLS),
-        @JsonSubTypes.Type(value = EsNodeConfig.class, name = 
DataNodeType.ELASTICSEARCH),
-        @JsonSubTypes.Type(value = PulsarNodeConfig.class, name = 
DataNodeType.PULSAR),
-})
-public abstract class NodeConfig implements Serializable {
+public class KafkaNodeConfig extends NodeConfig {
 
-    private Integer version;
-    private String nodeName;
-    private Map<String, String> properties;
+    private String bootstrapServers;
+    private String clientId;
+    private String acks;
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
index 1c82d4c9e2..49b7c8e0f3 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
@@ -32,6 +32,7 @@ import java.util.Map;
         @JsonSubTypes.Type(value = ClsNodeConfig.class, name = 
DataNodeType.CLS),
         @JsonSubTypes.Type(value = EsNodeConfig.class, name = 
DataNodeType.ELASTICSEARCH),
         @JsonSubTypes.Type(value = PulsarNodeConfig.class, name = 
DataNodeType.PULSAR),
+        @JsonSubTypes.Type(value = KafkaNodeConfig.class, name = 
DataNodeType.KAFKA),
 })
 public abstract class NodeConfig implements Serializable {
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeDTO.java
index 3317c3d714..8ccfed0760 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeDTO.java
@@ -49,7 +49,7 @@ public class KafkaDataNodeDTO {
     private String clientId;
 
     @ApiModelProperty(value = "kafka produce confirmation mechanism")
-    private String ack;
+    private String acks;
 
     @ApiModelProperty("audit set name")
     private String auditSetName;
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeInfo.java
index 3b0b433132..02b67b6ac4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeInfo.java
@@ -45,7 +45,7 @@ public class KafkaDataNodeInfo extends DataNodeInfo {
     private String clientId;
 
     @ApiModelProperty(value = "kafka produce confirmation mechanism")
-    private String ack;
+    private String acks;
 
     @ApiModelProperty("audit set name")
     private String auditSetName;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
index 4fbc740d36..968a670628 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.node.kafka;
 
+import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
+import org.apache.inlong.common.pojo.sort.node.NodeConfig;
 import org.apache.inlong.manager.common.consts.DataNodeType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -119,6 +121,14 @@ public class KafkaDataNodeOperator extends 
AbstractDataNodeOperator {
         return true;
     }
 
+    @Override
+    public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
+        DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
+        KafkaNodeConfig kafkaNodeConfig = 
CommonBeanUtils.copyProperties(dataNodeInfo, KafkaNodeConfig::new);
+        kafkaNodeConfig.setNodeName(dataNodeInfo.getName());
+        return kafkaNodeConfig;
+    }
+
     private boolean getKafkaConnection(String bootstrapServers) {
         KafkaClusterInfo kafkaClusterInfo = 
KafkaClusterInfo.builder().bootstrapServers(bootstrapServers).build();
         try {
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
index d7fa197c9f..169e7d1461 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
@@ -17,6 +17,10 @@
 
 package org.apache.inlong.manager.service.sink.kafka;
 
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -29,6 +33,7 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
 import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -40,6 +45,7 @@ import org.springframework.stereotype.Service;
 
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * Kafka sink operator
@@ -112,4 +118,20 @@ public class KafkaSinkOperator extends 
AbstractSinkOperator {
         return sink;
     }
 
+    @Override
+    public SinkConfig getSinkConfig(StreamSink sink) {
+        KafkaSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink, 
KafkaSinkConfig::new);
+        List<FieldConfig> fields = 
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
+                v -> {
+                    FieldConfig fieldConfig = new FieldConfig();
+                    FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
+                            v.getFieldType().toLowerCase());
+                    fieldConfig.setName(v.getFieldName());
+                    fieldConfig.setFormatInfo(formatInfo);
+                    return fieldConfig;
+                }).collect(Collectors.toList());
+        sinkConfig.setFieldConfigs(fields);
+        return sinkConfig;
+    }
+
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index c79922b0d8..4e94c67248 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -17,14 +17,16 @@
 
 package org.apache.inlong.sort.standalone.sink.kafka;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.commons.lang3.ClassUtils;
@@ -32,12 +34,12 @@ import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 /** Context of kafka sink. */
 public class KafkaFederationSinkContext extends SinkContext {
@@ -45,9 +47,8 @@ public class KafkaFederationSinkContext extends SinkContext {
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(KafkaFederationSinkContext.class);
     public static final String KEY_EVENT_HANDLER = "eventHandler";
 
-    private Context producerContext;
+    private KafkaNodeConfig kafkaNodeConfig;
     private Map<String, KafkaIdConfig> idConfigMap = new ConcurrentHashMap<>();
-    private List<CacheClusterConfig> clusterConfigList = new ArrayList<>();
 
     public KafkaFederationSinkContext(String sinkName, Context context, 
Channel channel) {
         super(sinkName, context, channel);
@@ -58,7 +59,7 @@ public class KafkaFederationSinkContext extends SinkContext {
     public void reload() {
         LOG.info("reload KafkaFederationSinkContext.");
         try {
-            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             if (newSortTaskConfig == null) {
                 LOG.error("newSortTaskConfig is null.");
                 return;
@@ -68,49 +69,28 @@ public class KafkaFederationSinkContext extends SinkContext 
{
                 return;
             }
             this.sortTaskConfig = newSortTaskConfig;
-            this.producerContext = new 
Context(this.sortTaskConfig.getSinkParams());
-
-            LOG.info("reload idTopicMap");
-            Map<String, KafkaIdConfig> newIdConfigMap = new 
ConcurrentHashMap<>();
-            List<Map<String, String>> idList = 
this.sortTaskConfig.getIdParams();
-            for (Map<String, String> idParam : idList) {
-                try {
-                    KafkaIdConfig idConfig = new KafkaIdConfig(idParam);
-                    newIdConfigMap.put(idConfig.getUid(), idConfig);
-                } catch (Exception e) {
-                    LOG.error("fail to parse kafka id config", e);
-                }
+            KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig) 
newSortTaskConfig.getNodeConfig();
+            if (kafkaNodeConfig == null || requestNodeConfig.getVersion() > 
kafkaNodeConfig.getVersion()) {
+                this.kafkaNodeConfig = requestNodeConfig;
             }
 
-            LOG.info("reload clusterConfig");
-            CacheClusterConfig clusterConfig = new CacheClusterConfig();
-            clusterConfig.setClusterName(this.taskName);
-            clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
-            List<CacheClusterConfig> newClusterConfigList = new ArrayList<>();
-            newClusterConfigList.add(clusterConfig);
-            this.idConfigMap = newIdConfigMap;
-            this.clusterConfigList = newClusterConfigList;
+            this.idConfigMap = this.sortTaskConfig.getClusters()
+                    .stream()
+                    .map(SortClusterConfig::getDataFlowConfigs)
+                    .flatMap(Collection::stream)
+                    .map(KafkaIdConfig::create)
+                    .collect(Collectors.toMap(
+                            config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+                            v -> v,
+                            (flow1, flow2) -> flow1));
+
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
-    /**
-     * get ProducerContext
-     *
-     * @return ProducerContext
-     */
-    public Context getProducerContext() {
-        return producerContext;
-    }
-
-    /**
-     * get ClusterConfigList
-     *
-     * @return ClusterConfigList
-     */
-    public List<CacheClusterConfig> getClusterConfigList() {
-        return clusterConfigList;
+    public KafkaNodeConfig getNodeConfig() {
+        return kafkaNodeConfig;
     }
 
     /**
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index 73d58afde0..e7b7b17ea4 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -18,15 +18,22 @@
 package org.apache.inlong.sort.standalone.sink.kafka;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.utils.Constants;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 import java.util.Map;
 
-/**
- * 
- * KafkaIdConfig
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class KafkaIdConfig {
 
     public static final String KEY_DATA_TYPE = "dataType";
@@ -40,18 +47,6 @@ public class KafkaIdConfig {
     private String topic;
     private DataTypeEnum dataType = DataTypeEnum.TEXT;
 
-    /**
-     * Constructor
-     */
-    public KafkaIdConfig() {
-
-    }
-
-    /**
-     * Constructor
-     * 
-     * @param idParam
-     */
     public KafkaIdConfig(Map<String, String> idParam) {
         this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
         this.inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
@@ -62,112 +57,16 @@ public class KafkaIdConfig {
                 .convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE, 
DataTypeEnum.TEXT.getType()));
     }
 
-    /**
-     * get inlongGroupId
-     * 
-     * @return the inlongGroupId
-     */
-    public String getInlongGroupId() {
-        return inlongGroupId;
-    }
-
-    /**
-     * set inlongGroupId
-     * 
-     * @param inlongGroupId the inlongGroupId to set
-     */
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
-    }
-
-    /**
-     * get inlongStreamId
-     * 
-     * @return the inlongStreamId
-     */
-    public String getInlongStreamId() {
-        return inlongStreamId;
-    }
-
-    /**
-     * set inlongStreamId
-     * 
-     * @param inlongStreamId the inlongStreamId to set
-     */
-    public void setInlongStreamId(String inlongStreamId) {
-        this.inlongStreamId = inlongStreamId;
-    }
-
-    /**
-     * get uid
-     * 
-     * @return the uid
-     */
-    public String getUid() {
-        return uid;
-    }
-
-    /**
-     * set uid
-     * 
-     * @param uid the uid to set
-     */
-    public void setUid(String uid) {
-        this.uid = uid;
-    }
-
-    /**
-     * get separator
-     * 
-     * @return the separator
-     */
-    public String getSeparator() {
-        return separator;
-    }
-
-    /**
-     * set separator
-     * 
-     * @param separator the separator to set
-     */
-    public void setSeparator(String separator) {
-        this.separator = separator;
-    }
-
-    /**
-     * get topic
-     * 
-     * @return the topic
-     */
-    public String getTopic() {
-        return topic;
-    }
-
-    /**
-     * set topic
-     * 
-     * @param topic the topic to set
-     */
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    /**
-     * get dataType
-     * 
-     * @return the dataType
-     */
-    public DataTypeEnum getDataType() {
-        return dataType;
-    }
+    public static KafkaIdConfig create(DataFlowConfig dataFlowConfig) {
+        KafkaSinkConfig sinkConfig = (KafkaSinkConfig) 
dataFlowConfig.getSinkConfig();
 
-    /**
-     * set dataType
-     * 
-     * @param dataType the dataType to set
-     */
-    public void setDataType(DataTypeEnum dataType) {
-        this.dataType = dataType;
+        return KafkaIdConfig.builder()
+                .inlongGroupId(dataFlowConfig.getInlongGroupId())
+                .inlongStreamId(dataFlowConfig.getInlongStreamId())
+                .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), 
dataFlowConfig.getInlongStreamId()))
+                .topic(sinkConfig.getTopicName())
+                .dataType(DataTypeEnum.TEXT)
+                .build();
     }
 
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 944aac8fa3..d0fda7c5aa 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -17,12 +17,13 @@
 
 package org.apache.inlong.sort.standalone.sink.kafka;
 
+import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.flume.Context;
 import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleAware;
@@ -43,7 +44,7 @@ public class KafkaProducerCluster implements LifecycleAware {
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(KafkaProducerCluster.class);
 
     private final String workerName;
-    protected final CacheClusterConfig config;
+    protected final KafkaNodeConfig nodeConfig;
     private final KafkaFederationSinkContext sinkContext;
     private final Context context;
 
@@ -53,23 +54,16 @@ public class KafkaProducerCluster implements LifecycleAware 
{
 
     private KafkaProducer<String, byte[]> producer;
 
-    /**
-     * constructor of KafkaProducerCluster
-     *
-     * @param workerName                 workerName
-     * @param config                     config of cluster
-     * @param kafkaFederationSinkContext producer context
-     */
     public KafkaProducerCluster(
             String workerName,
-            CacheClusterConfig config,
+            KafkaNodeConfig nodeConfig,
             KafkaFederationSinkContext kafkaFederationSinkContext) {
         this.workerName = Preconditions.checkNotNull(workerName);
-        this.config = Preconditions.checkNotNull(config);
+        this.nodeConfig = nodeConfig;
         this.sinkContext = 
Preconditions.checkNotNull(kafkaFederationSinkContext);
-        this.context = 
Preconditions.checkNotNull(kafkaFederationSinkContext.getProducerContext());
+        this.context = new Context(nodeConfig.getProperties() != null ? 
nodeConfig.getProperties() : Maps.newHashMap());
         this.state = LifecycleState.IDLE;
-        this.cacheClusterName = 
Preconditions.checkNotNull(config.getClusterName());
+        this.cacheClusterName = nodeConfig.getNodeName();
         this.handler = sinkContext.createEventHandler();
     }
 
@@ -83,11 +77,14 @@ public class KafkaProducerCluster implements LifecycleAware 
{
             props.put(
                     ProducerConfig.PARTITIONER_CLASS_CONFIG,
                     context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
PartitionerSelector.class.getName()));
+            props.put(
+                    ProducerConfig.ACKS_CONFIG,
+                    context.getString(ProducerConfig.ACKS_CONFIG, "all"));
             props.put(
                     ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                    
context.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+                    nodeConfig.getBootstrapServers());
             props.put(ProducerConfig.CLIENT_ID_CONFIG,
-                    context.getString(ProducerConfig.CLIENT_ID_CONFIG, 
cacheClusterName) + "-" + workerName);
+                    nodeConfig.getClientId() + "-" + workerName);
             LOG.info("init kafka client info: " + props);
             producer = new KafkaProducer<>(props, new StringSerializer(), new 
ByteArraySerializer());
             Preconditions.checkNotNull(producer);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
index 4cd261d3e9..23e817dd8d 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.standalone.sink.kafka;
 
+import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import com.google.common.base.Preconditions;
@@ -26,14 +26,9 @@ import org.apache.flume.Transaction;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * KafkaProducerFederation.
@@ -47,34 +42,23 @@ public class KafkaProducerFederation implements Runnable {
     private final KafkaFederationSinkContext context;
     private ScheduledExecutorService pool;
     private long reloadInterval;
+    private KafkaNodeConfig nodeConfig;
+    private KafkaProducerCluster cluster;
+    private KafkaProducerCluster deleteCluster;
 
-    private List<KafkaProducerCluster> clusterList = new ArrayList<>();
-    private List<KafkaProducerCluster> deletingClusterList = new ArrayList<>();
-
-    private AtomicInteger clusterIndex = new AtomicInteger(0);
-
-    /**
-     * constructor of KafkaProducerFederation
-     *
-     * @param workerName workerName
-     * @param context    context
-     */
     public KafkaProducerFederation(String workerName, 
KafkaFederationSinkContext context) {
         this.workerName = Preconditions.checkNotNull(workerName);
         this.context = Preconditions.checkNotNull(context);
         this.reloadInterval = context.getReloadInterval();
     }
 
-    /** close */
     public void close() {
         try {
             this.pool.shutdownNow();
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
-        for (KafkaProducerCluster cluster : this.clusterList) {
-            cluster.stop();
-        }
+        cluster.stop();
     }
 
     /** start */
@@ -87,73 +71,38 @@ public class KafkaProducerFederation implements Runnable {
         }
     }
 
-    /** Implements {@link Runnable} method. */
     @Override
     public void run() {
         this.reload();
     }
 
-    /** reload module */
     private void reload() {
         try {
-            LOG.info("stop deleting clusters, size is {}", 
deletingClusterList.size());
-            deletingClusterList.forEach(KafkaProducerCluster::stop);
-            deletingClusterList.clear();
-
-            LOG.info("update cluster list");
-            List<CacheClusterConfig> newClusterConfigList = 
this.context.getClusterConfigList();
-            // prepare
-            Set<String> newClusterNames = new HashSet<>();
-            Set<String> oldClusterNames = new HashSet<>();
-            newClusterConfigList.forEach(
-                    clusterConfig -> 
newClusterNames.add(clusterConfig.getClusterName()));
-            clusterList.forEach(cluster -> 
oldClusterNames.add(cluster.getCacheClusterName()));
-            List<KafkaProducerCluster> newClusterList = new 
ArrayList<>(newClusterConfigList.size());
-
-            // add new cluster
-            newClusterConfigList.forEach(
-                    config -> {
-                        if 
(!oldClusterNames.contains(config.getClusterName())) {
-                            KafkaProducerCluster cluster = new 
KafkaProducerCluster(workerName, config, context);
-                            cluster.start();
-                            newClusterList.add(cluster);
-                        }
-                    });
-
-            // remove expire cluster
-            clusterList.forEach(
-                    cluster -> {
-                        if 
(!newClusterNames.contains(cluster.getCacheClusterName())) {
-                            deletingClusterList.add(cluster);
-                        } else {
-                            newClusterList.add(cluster);
-                        }
-                    });
-            LOG.info("the modified cluster list size is {}", 
newClusterList.size());
-            this.clusterList = newClusterList;
+            if (deleteCluster != null) {
+                deleteCluster.stop();
+                deleteCluster = null;
+            }
+        } catch (Exception e) {
+            LOG.error("failed to close delete cluster, ex={}", e.getMessage(), 
e);
+        }
+
+        try {
+
+            if (nodeConfig != null && context.getNodeConfig().getVersion() <= 
nodeConfig.getVersion()) {
+                return;
+            }
+            this.nodeConfig = context.getNodeConfig();
+            KafkaProducerCluster updateCluster = new 
KafkaProducerCluster(workerName, nodeConfig, context);
+            updateCluster.start();
+            this.deleteCluster = cluster;
+            this.cluster = updateCluster;
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
-    /**
-     * send event
-     *
-     * @param  profileEvent event to send
-     * @param  tx           transaction
-     * @return              send result
-     * @throws IOException
-     */
     public boolean send(ProfileEvent profileEvent, Transaction tx) throws 
IOException {
-        int currentIndex = clusterIndex.getAndIncrement();
-        if (currentIndex > Integer.MAX_VALUE / 2) {
-            clusterIndex.set(0);
-        }
-        List<KafkaProducerCluster> currentClusterList = this.clusterList;
-        int currentSize = currentClusterList.size();
-        int realIndex = currentIndex % currentSize;
-        KafkaProducerCluster clusterProducer = 
currentClusterList.get(realIndex);
-        return clusterProducer.send(profileEvent, tx);
+        return cluster.send(profileEvent, tx);
     }
 
     /** Init ScheduledExecutorService with fix reload rate {@link 
#reloadInterval}. */

Reply via email to