This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5a331f1c86 [INLONG-10230][Sort] KafkaSink support unified
configuration (#10235)
5a331f1c86 is described below
commit 5a331f1c86f078f2995c0f95d58509dd70868c01
Author: vernedeng <[email protected]>
AuthorDate: Fri May 17 19:56:28 2024 +0800
[INLONG-10230][Sort] KafkaSink support unified configuration (#10235)
---
.../sink/{SinkConfig.java => KafkaSinkConfig.java} | 21 +--
.../common/pojo/sort/dataflow/sink/SinkConfig.java | 1 +
.../node/{NodeConfig.java => KafkaNodeConfig.java} | 21 +--
.../inlong/common/pojo/sort/node/NodeConfig.java | 1 +
.../manager/pojo/node/kafka/KafkaDataNodeDTO.java | 2 +-
.../manager/pojo/node/kafka/KafkaDataNodeInfo.java | 2 +-
.../service/node/kafka/KafkaDataNodeOperator.java | 10 ++
.../service/sink/kafka/KafkaSinkOperator.java | 22 ++++
.../sink/kafka/KafkaFederationSinkContext.java | 70 ++++------
.../sort/standalone/sink/kafka/KafkaIdConfig.java | 141 +++------------------
.../sink/kafka/KafkaProducerCluster.java | 27 ++--
.../sink/kafka/KafkaProducerFederation.java | 99 ++++-----------
12 files changed, 125 insertions(+), 292 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
similarity index 52%
copy from
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
copy to
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
index 06635d219f..f0ec2bd6b1 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/KafkaSinkConfig.java
@@ -17,25 +17,12 @@
package org.apache.inlong.common.pojo.sort.dataflow.sink;
-import org.apache.inlong.common.constant.SinkType;
-import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
-
import lombok.Data;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import java.io.Serializable;
-import java.util.List;
+import lombok.EqualsAndHashCode;
+@EqualsAndHashCode(callSuper = true)
@Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes({
- @JsonSubTypes.Type(value = ClsSinkConfig.class, name = SinkType.CLS),
- @JsonSubTypes.Type(value = EsSinkConfig.class, name =
SinkType.ELASTICSEARCH),
- @JsonSubTypes.Type(value = PulsarSinkConfig.class, name =
SinkType.PULSAR),
-})
-public abstract class SinkConfig implements Serializable {
+public class KafkaSinkConfig extends SinkConfig {
- private String encodingType;
- private List<FieldConfig> fieldConfigs;
+ private String topicName;
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
index 06635d219f..aa0d37ba3f 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/dataflow/sink/SinkConfig.java
@@ -33,6 +33,7 @@ import java.util.List;
@JsonSubTypes.Type(value = ClsSinkConfig.class, name = SinkType.CLS),
@JsonSubTypes.Type(value = EsSinkConfig.class, name =
SinkType.ELASTICSEARCH),
@JsonSubTypes.Type(value = PulsarSinkConfig.class, name =
SinkType.PULSAR),
+ @JsonSubTypes.Type(value = KafkaSinkConfig.class, name =
SinkType.KAFKA),
})
public abstract class SinkConfig implements Serializable {
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/KafkaNodeConfig.java
similarity index 52%
copy from
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
copy to
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/KafkaNodeConfig.java
index 1c82d4c9e2..9bafb3d85b 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/KafkaNodeConfig.java
@@ -17,25 +17,12 @@
package org.apache.inlong.common.pojo.sort.node;
-import org.apache.inlong.common.constant.DataNodeType;
-
import lombok.Data;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
-
-import java.io.Serializable;
-import java.util.Map;
@Data
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-@JsonSubTypes({
- @JsonSubTypes.Type(value = ClsNodeConfig.class, name =
DataNodeType.CLS),
- @JsonSubTypes.Type(value = EsNodeConfig.class, name =
DataNodeType.ELASTICSEARCH),
- @JsonSubTypes.Type(value = PulsarNodeConfig.class, name =
DataNodeType.PULSAR),
-})
-public abstract class NodeConfig implements Serializable {
+public class KafkaNodeConfig extends NodeConfig {
- private Integer version;
- private String nodeName;
- private Map<String, String> properties;
+ private String bootstrapServers;
+ private String clientId;
+ private String acks;
}
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
index 1c82d4c9e2..49b7c8e0f3 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/node/NodeConfig.java
@@ -32,6 +32,7 @@ import java.util.Map;
@JsonSubTypes.Type(value = ClsNodeConfig.class, name =
DataNodeType.CLS),
@JsonSubTypes.Type(value = EsNodeConfig.class, name =
DataNodeType.ELASTICSEARCH),
@JsonSubTypes.Type(value = PulsarNodeConfig.class, name =
DataNodeType.PULSAR),
+ @JsonSubTypes.Type(value = KafkaNodeConfig.class, name =
DataNodeType.KAFKA),
})
public abstract class NodeConfig implements Serializable {
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeDTO.java
index 3317c3d714..8ccfed0760 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeDTO.java
@@ -49,7 +49,7 @@ public class KafkaDataNodeDTO {
private String clientId;
@ApiModelProperty(value = "kafka produce confirmation mechanism")
- private String ack;
+ private String acks;
@ApiModelProperty("audit set name")
private String auditSetName;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeInfo.java
index 3b0b433132..02b67b6ac4 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/kafka/KafkaDataNodeInfo.java
@@ -45,7 +45,7 @@ public class KafkaDataNodeInfo extends DataNodeInfo {
private String clientId;
@ApiModelProperty(value = "kafka produce confirmation mechanism")
- private String ack;
+ private String acks;
@ApiModelProperty("audit set name")
private String auditSetName;
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
index 4fbc740d36..968a670628 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/kafka/KafkaDataNodeOperator.java
@@ -17,6 +17,8 @@
package org.apache.inlong.manager.service.node.kafka;
+import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
+import org.apache.inlong.common.pojo.sort.node.NodeConfig;
import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -119,6 +121,14 @@ public class KafkaDataNodeOperator extends
AbstractDataNodeOperator {
return true;
}
+ @Override
+ public NodeConfig getNodeConfig(DataNodeEntity dataNodeEntity) {
+ DataNodeInfo dataNodeInfo = this.getFromEntity(dataNodeEntity);
+ KafkaNodeConfig kafkaNodeConfig =
CommonBeanUtils.copyProperties(dataNodeInfo, KafkaNodeConfig::new);
+ kafkaNodeConfig.setNodeName(dataNodeInfo.getName());
+ return kafkaNodeConfig;
+ }
+
private boolean getKafkaConnection(String bootstrapServers) {
KafkaClusterInfo kafkaClusterInfo =
KafkaClusterInfo.builder().bootstrapServers(bootstrapServers).build();
try {
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
index d7fa197c9f..169e7d1461 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/kafka/KafkaSinkOperator.java
@@ -17,6 +17,10 @@
package org.apache.inlong.manager.service.sink.kafka;
+import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.SinkConfig;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -29,6 +33,7 @@ import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSink;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkDTO;
import org.apache.inlong.manager.pojo.sink.kafka.KafkaSinkRequest;
+import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -40,6 +45,7 @@ import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Kafka sink operator
@@ -112,4 +118,20 @@ public class KafkaSinkOperator extends
AbstractSinkOperator {
return sink;
}
+ @Override
+ public SinkConfig getSinkConfig(StreamSink sink) {
+ KafkaSinkConfig sinkConfig = CommonBeanUtils.copyProperties(sink,
KafkaSinkConfig::new);
+ List<FieldConfig> fields =
sinkFieldMapper.selectBySinkId(sink.getId()).stream().map(
+ v -> {
+ FieldConfig fieldConfig = new FieldConfig();
+ FormatInfo formatInfo = FieldInfoUtils.convertFieldFormat(
+ v.getFieldType().toLowerCase());
+ fieldConfig.setName(v.getFieldName());
+ fieldConfig.setFormatInfo(formatInfo);
+ return fieldConfig;
+ }).collect(Collectors.toList());
+ sinkConfig.setFieldConfigs(fields);
+ return sinkConfig;
+ }
+
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index c79922b0d8..4e94c67248 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -17,14 +17,16 @@
package org.apache.inlong.sort.standalone.sink.kafka;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.commons.lang3.ClassUtils;
@@ -32,12 +34,12 @@ import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.slf4j.Logger;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
/** Context of kafka sink. */
public class KafkaFederationSinkContext extends SinkContext {
@@ -45,9 +47,8 @@ public class KafkaFederationSinkContext extends SinkContext {
public static final Logger LOG =
InlongLoggerFactory.getLogger(KafkaFederationSinkContext.class);
public static final String KEY_EVENT_HANDLER = "eventHandler";
- private Context producerContext;
+ private KafkaNodeConfig kafkaNodeConfig;
private Map<String, KafkaIdConfig> idConfigMap = new ConcurrentHashMap<>();
- private List<CacheClusterConfig> clusterConfigList = new ArrayList<>();
public KafkaFederationSinkContext(String sinkName, Context context,
Channel channel) {
super(sinkName, context, channel);
@@ -58,7 +59,7 @@ public class KafkaFederationSinkContext extends SinkContext {
public void reload() {
LOG.info("reload KafkaFederationSinkContext.");
try {
- SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
if (newSortTaskConfig == null) {
LOG.error("newSortTaskConfig is null.");
return;
@@ -68,49 +69,28 @@ public class KafkaFederationSinkContext extends SinkContext
{
return;
}
this.sortTaskConfig = newSortTaskConfig;
- this.producerContext = new
Context(this.sortTaskConfig.getSinkParams());
-
- LOG.info("reload idTopicMap");
- Map<String, KafkaIdConfig> newIdConfigMap = new
ConcurrentHashMap<>();
- List<Map<String, String>> idList =
this.sortTaskConfig.getIdParams();
- for (Map<String, String> idParam : idList) {
- try {
- KafkaIdConfig idConfig = new KafkaIdConfig(idParam);
- newIdConfigMap.put(idConfig.getUid(), idConfig);
- } catch (Exception e) {
- LOG.error("fail to parse kafka id config", e);
- }
+ KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig)
newSortTaskConfig.getNodeConfig();
+ if (kafkaNodeConfig == null || requestNodeConfig.getVersion() >
kafkaNodeConfig.getVersion()) {
+ this.kafkaNodeConfig = requestNodeConfig;
}
- LOG.info("reload clusterConfig");
- CacheClusterConfig clusterConfig = new CacheClusterConfig();
- clusterConfig.setClusterName(this.taskName);
- clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
- List<CacheClusterConfig> newClusterConfigList = new ArrayList<>();
- newClusterConfigList.add(clusterConfig);
- this.idConfigMap = newIdConfigMap;
- this.clusterConfigList = newClusterConfigList;
+ this.idConfigMap = this.sortTaskConfig.getClusters()
+ .stream()
+ .map(SortClusterConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .map(KafkaIdConfig::create)
+ .collect(Collectors.toMap(
+ config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+ v -> v,
+ (flow1, flow2) -> flow1));
+
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
- /**
- * get ProducerContext
- *
- * @return ProducerContext
- */
- public Context getProducerContext() {
- return producerContext;
- }
-
- /**
- * get ClusterConfigList
- *
- * @return ClusterConfigList
- */
- public List<CacheClusterConfig> getClusterConfigList() {
- return clusterConfigList;
+ public KafkaNodeConfig getNodeConfig() {
+ return kafkaNodeConfig;
}
/**
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
index 73d58afde0..e7b7b17ea4 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaIdConfig.java
@@ -18,15 +18,22 @@
package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.KafkaSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.utils.Constants;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
import java.util.Map;
-/**
- *
- * KafkaIdConfig
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class KafkaIdConfig {
public static final String KEY_DATA_TYPE = "dataType";
@@ -40,18 +47,6 @@ public class KafkaIdConfig {
private String topic;
private DataTypeEnum dataType = DataTypeEnum.TEXT;
- /**
- * Constructor
- */
- public KafkaIdConfig() {
-
- }
-
- /**
- * Constructor
- *
- * @param idParam
- */
public KafkaIdConfig(Map<String, String> idParam) {
this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
this.inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
@@ -62,112 +57,16 @@ public class KafkaIdConfig {
.convert(idParam.getOrDefault(KafkaIdConfig.KEY_DATA_TYPE,
DataTypeEnum.TEXT.getType()));
}
- /**
- * get inlongGroupId
- *
- * @return the inlongGroupId
- */
- public String getInlongGroupId() {
- return inlongGroupId;
- }
-
- /**
- * set inlongGroupId
- *
- * @param inlongGroupId the inlongGroupId to set
- */
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId;
- }
-
- /**
- * get inlongStreamId
- *
- * @return the inlongStreamId
- */
- public String getInlongStreamId() {
- return inlongStreamId;
- }
-
- /**
- * set inlongStreamId
- *
- * @param inlongStreamId the inlongStreamId to set
- */
- public void setInlongStreamId(String inlongStreamId) {
- this.inlongStreamId = inlongStreamId;
- }
-
- /**
- * get uid
- *
- * @return the uid
- */
- public String getUid() {
- return uid;
- }
-
- /**
- * set uid
- *
- * @param uid the uid to set
- */
- public void setUid(String uid) {
- this.uid = uid;
- }
-
- /**
- * get separator
- *
- * @return the separator
- */
- public String getSeparator() {
- return separator;
- }
-
- /**
- * set separator
- *
- * @param separator the separator to set
- */
- public void setSeparator(String separator) {
- this.separator = separator;
- }
-
- /**
- * get topic
- *
- * @return the topic
- */
- public String getTopic() {
- return topic;
- }
-
- /**
- * set topic
- *
- * @param topic the topic to set
- */
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- /**
- * get dataType
- *
- * @return the dataType
- */
- public DataTypeEnum getDataType() {
- return dataType;
- }
+ public static KafkaIdConfig create(DataFlowConfig dataFlowConfig) {
+ KafkaSinkConfig sinkConfig = (KafkaSinkConfig)
dataFlowConfig.getSinkConfig();
- /**
- * set dataType
- *
- * @param dataType the dataType to set
- */
- public void setDataType(DataTypeEnum dataType) {
- this.dataType = dataType;
+ return KafkaIdConfig.builder()
+ .inlongGroupId(dataFlowConfig.getInlongGroupId())
+ .inlongStreamId(dataFlowConfig.getInlongStreamId())
+ .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(),
dataFlowConfig.getInlongStreamId()))
+ .topic(sinkConfig.getTopicName())
+ .dataType(DataTypeEnum.TEXT)
+ .build();
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index 944aac8fa3..d0fda7c5aa 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -17,12 +17,13 @@
package org.apache.inlong.sort.standalone.sink.kafka;
+import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
@@ -43,7 +44,7 @@ public class KafkaProducerCluster implements LifecycleAware {
public static final Logger LOG =
InlongLoggerFactory.getLogger(KafkaProducerCluster.class);
private final String workerName;
- protected final CacheClusterConfig config;
+ protected final KafkaNodeConfig nodeConfig;
private final KafkaFederationSinkContext sinkContext;
private final Context context;
@@ -53,23 +54,16 @@ public class KafkaProducerCluster implements LifecycleAware
{
private KafkaProducer<String, byte[]> producer;
- /**
- * constructor of KafkaProducerCluster
- *
- * @param workerName workerName
- * @param config config of cluster
- * @param kafkaFederationSinkContext producer context
- */
public KafkaProducerCluster(
String workerName,
- CacheClusterConfig config,
+ KafkaNodeConfig nodeConfig,
KafkaFederationSinkContext kafkaFederationSinkContext) {
this.workerName = Preconditions.checkNotNull(workerName);
- this.config = Preconditions.checkNotNull(config);
+ this.nodeConfig = nodeConfig;
this.sinkContext =
Preconditions.checkNotNull(kafkaFederationSinkContext);
- this.context =
Preconditions.checkNotNull(kafkaFederationSinkContext.getProducerContext());
+ this.context = new Context(nodeConfig.getProperties() != null ?
nodeConfig.getProperties() : Maps.newHashMap());
this.state = LifecycleState.IDLE;
- this.cacheClusterName =
Preconditions.checkNotNull(config.getClusterName());
+ this.cacheClusterName = nodeConfig.getNodeName();
this.handler = sinkContext.createEventHandler();
}
@@ -83,11 +77,14 @@ public class KafkaProducerCluster implements LifecycleAware
{
props.put(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG,
PartitionerSelector.class.getName()));
+ props.put(
+ ProducerConfig.ACKS_CONFIG,
+ context.getString(ProducerConfig.ACKS_CONFIG, "all"));
props.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-
context.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ nodeConfig.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG,
- context.getString(ProducerConfig.CLIENT_ID_CONFIG,
cacheClusterName) + "-" + workerName);
+ nodeConfig.getClientId() + "-" + workerName);
LOG.info("init kafka client info: " + props);
producer = new KafkaProducer<>(props, new StringSerializer(), new
ByteArraySerializer());
Preconditions.checkNotNull(producer);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
index 4cd261d3e9..23e817dd8d 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
@@ -17,8 +17,8 @@
package org.apache.inlong.sort.standalone.sink.kafka;
+import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.google.common.base.Preconditions;
@@ -26,14 +26,9 @@ import org.apache.flume.Transaction;
import org.slf4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* KafkaProducerFederation.
@@ -47,34 +42,23 @@ public class KafkaProducerFederation implements Runnable {
private final KafkaFederationSinkContext context;
private ScheduledExecutorService pool;
private long reloadInterval;
+ private KafkaNodeConfig nodeConfig;
+ private KafkaProducerCluster cluster;
+ private KafkaProducerCluster deleteCluster;
- private List<KafkaProducerCluster> clusterList = new ArrayList<>();
- private List<KafkaProducerCluster> deletingClusterList = new ArrayList<>();
-
- private AtomicInteger clusterIndex = new AtomicInteger(0);
-
- /**
- * constructor of KafkaProducerFederation
- *
- * @param workerName workerName
- * @param context context
- */
public KafkaProducerFederation(String workerName,
KafkaFederationSinkContext context) {
this.workerName = Preconditions.checkNotNull(workerName);
this.context = Preconditions.checkNotNull(context);
this.reloadInterval = context.getReloadInterval();
}
- /** close */
public void close() {
try {
this.pool.shutdownNow();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
- for (KafkaProducerCluster cluster : this.clusterList) {
- cluster.stop();
- }
+ cluster.stop();
}
/** start */
@@ -87,73 +71,38 @@ public class KafkaProducerFederation implements Runnable {
}
}
- /** Implements {@link Runnable} method. */
@Override
public void run() {
this.reload();
}
- /** reload module */
private void reload() {
try {
- LOG.info("stop deleting clusters, size is {}",
deletingClusterList.size());
- deletingClusterList.forEach(KafkaProducerCluster::stop);
- deletingClusterList.clear();
-
- LOG.info("update cluster list");
- List<CacheClusterConfig> newClusterConfigList =
this.context.getClusterConfigList();
- // prepare
- Set<String> newClusterNames = new HashSet<>();
- Set<String> oldClusterNames = new HashSet<>();
- newClusterConfigList.forEach(
- clusterConfig ->
newClusterNames.add(clusterConfig.getClusterName()));
- clusterList.forEach(cluster ->
oldClusterNames.add(cluster.getCacheClusterName()));
- List<KafkaProducerCluster> newClusterList = new
ArrayList<>(newClusterConfigList.size());
-
- // add new cluster
- newClusterConfigList.forEach(
- config -> {
- if
(!oldClusterNames.contains(config.getClusterName())) {
- KafkaProducerCluster cluster = new
KafkaProducerCluster(workerName, config, context);
- cluster.start();
- newClusterList.add(cluster);
- }
- });
-
- // remove expire cluster
- clusterList.forEach(
- cluster -> {
- if
(!newClusterNames.contains(cluster.getCacheClusterName())) {
- deletingClusterList.add(cluster);
- } else {
- newClusterList.add(cluster);
- }
- });
- LOG.info("the modified cluster list size is {}",
newClusterList.size());
- this.clusterList = newClusterList;
+ if (deleteCluster != null) {
+ deleteCluster.stop();
+ deleteCluster = null;
+ }
+ } catch (Exception e) {
+ LOG.error("failed to close delete cluster, ex={}", e.getMessage(),
e);
+ }
+
+ try {
+
+ if (nodeConfig != null && context.getNodeConfig().getVersion() <=
nodeConfig.getVersion()) {
+ return;
+ }
+ this.nodeConfig = context.getNodeConfig();
+ KafkaProducerCluster updateCluster = new
KafkaProducerCluster(workerName, nodeConfig, context);
+ updateCluster.start();
+ this.deleteCluster = cluster;
+ this.cluster = updateCluster;
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}
- /**
- * send event
- *
- * @param profileEvent event to send
- * @param tx transaction
- * @return send result
- * @throws IOException
- */
public boolean send(ProfileEvent profileEvent, Transaction tx) throws
IOException {
- int currentIndex = clusterIndex.getAndIncrement();
- if (currentIndex > Integer.MAX_VALUE / 2) {
- clusterIndex.set(0);
- }
- List<KafkaProducerCluster> currentClusterList = this.clusterList;
- int currentSize = currentClusterList.size();
- int realIndex = currentIndex % currentSize;
- KafkaProducerCluster clusterProducer =
currentClusterList.get(realIndex);
- return clusterProducer.send(profileEvent, tx);
+ return cluster.send(profileEvent, tx);
}
/** Init ScheduledExecutorService with fix reload rate {@link
#reloadInterval}. */