This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 00035ee2e8 [INLONG-10594][Sort] Provide default kafka producer
configuration (#10595)
00035ee2e8 is described below
commit 00035ee2e822a905aaa4768b1ef49b09b8fc192a
Author: vernedeng <[email protected]>
AuthorDate: Wed Jul 10 16:36:20 2024 +0800
[INLONG-10594][Sort] Provide default kafka producer configuration (#10595)
* [INLONG-10594][Sort] Provide default kafka producer configuration
---------
Co-authored-by: vernedeng <[email protected]>
---
.../standalone/config/pojo/CacheClusterConfig.java | 39 +-------
.../sink/kafka/KafkaFederationSinkContext.java | 11 +++
.../sink/kafka/KafkaProducerCluster.java | 106 +++++++++++++++------
.../sink/kafka/KafkaProducerFederation.java | 31 +++++-
.../sink/pulsar/PulsarFederationSinkContext.java | 8 ++
5 files changed, 131 insertions(+), 64 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
index caee067831..83ae055e0d 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/pojo/CacheClusterConfig.java
@@ -17,6 +17,8 @@
package org.apache.inlong.sort.standalone.config.pojo;
+import lombok.Data;
+
import java.util.HashMap;
import java.util.Map;
@@ -24,45 +26,10 @@ import java.util.Map;
*
* CacheClusterConfig
*/
+@Data
public class CacheClusterConfig {
private String clusterName;
private Map<String, String> params = new HashMap<>();
- /**
- * get clusterName
- *
- * @return the clusterName
- */
- public String getClusterName() {
- return clusterName;
- }
-
- /**
- * set clusterName
- *
- * @param clusterName the clusterName to set
- */
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- /**
- * get params
- *
- * @return the params
- */
- public Map<String, String> getParams() {
- return params;
- }
-
- /**
- * set params
- *
- * @param params the params to set
- */
- public void setParams(Map<String, String> params) {
- this.params = params;
- }
-
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 739b214b1e..f1dbdeb528 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -25,6 +25,7 @@ import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
@@ -52,6 +53,7 @@ public class KafkaFederationSinkContext extends SinkContext {
public static final String KEY_EVENT_HANDLER = "eventHandler";
private KafkaNodeConfig kafkaNodeConfig;
+ private CacheClusterConfig cacheClusterConfig;
private Map<String, KafkaIdConfig> idConfigMap = new ConcurrentHashMap<>();
public KafkaFederationSinkContext(String sinkName, Context context,
Channel channel) {
@@ -82,6 +84,11 @@ public class KafkaFederationSinkContext extends SinkContext {
this.taskConfig = newTaskConfig;
this.sortTaskConfig = newSortTaskConfig;
+ CacheClusterConfig clusterConfig = new CacheClusterConfig();
+ clusterConfig.setClusterName(this.taskName);
+ clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+ this.cacheClusterConfig = clusterConfig;
+
Map<String, KafkaIdConfig> fromTaskConfig =
fromTaskConfig(taskConfig);
Map<String, KafkaIdConfig> fromSortTaskConfig =
fromSortTaskConfig(sortTaskConfig);
SortConfigMetricReporter.reportClusterDiff(clusterId, taskName,
fromTaskConfig, fromSortTaskConfig);
@@ -121,6 +128,10 @@ public class KafkaFederationSinkContext extends
SinkContext {
return kafkaNodeConfig;
}
+ public CacheClusterConfig getCacheClusterConfig() {
+ return cacheClusterConfig;
+ }
+
/**
* get Topic by uid
*
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
index d0fda7c5aa..95a8d102ac 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerCluster.java
@@ -19,12 +19,12 @@ package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Properties;
/** wrapper of kafka producer */
@@ -45,10 +46,9 @@ public class KafkaProducerCluster implements LifecycleAware {
private final String workerName;
protected final KafkaNodeConfig nodeConfig;
+ protected final CacheClusterConfig cacheClusterConfig;
private final KafkaFederationSinkContext sinkContext;
- private final Context context;
- private final String cacheClusterName;
private LifecycleState state;
private IEvent2KafkaRecordHandler handler;
@@ -56,36 +56,67 @@ public class KafkaProducerCluster implements LifecycleAware
{
public KafkaProducerCluster(
String workerName,
+ CacheClusterConfig cacheClusterConfig,
KafkaNodeConfig nodeConfig,
KafkaFederationSinkContext kafkaFederationSinkContext) {
this.workerName = Preconditions.checkNotNull(workerName);
this.nodeConfig = nodeConfig;
+ this.cacheClusterConfig = cacheClusterConfig;
this.sinkContext =
Preconditions.checkNotNull(kafkaFederationSinkContext);
- this.context = new Context(nodeConfig.getProperties() != null ?
nodeConfig.getProperties() : Maps.newHashMap());
this.state = LifecycleState.IDLE;
- this.cacheClusterName = nodeConfig.getNodeName();
this.handler = sinkContext.createEventHandler();
}
/** start and init kafka producer */
@Override
public void start() {
+ if (CommonPropertiesHolder.useUnifiedConfiguration()) {
+ startByNodeConfig();
+ } else {
+ startByCacheCluster();
+ }
+ }
+
+ private void startByCacheCluster() {
this.state = LifecycleState.START;
+ if (cacheClusterConfig == null) {
+ LOG.error("start kafka producer cluster failed, cacheClusterConfig
config is null");
+ return;
+ }
try {
- Properties props = new Properties();
- props.putAll(context.getParameters());
- props.put(
- ProducerConfig.PARTITIONER_CLASS_CONFIG,
- context.getString(ProducerConfig.PARTITIONER_CLASS_CONFIG,
PartitionerSelector.class.getName()));
- props.put(
- ProducerConfig.ACKS_CONFIG,
- context.getString(ProducerConfig.ACKS_CONFIG, "all"));
- props.put(
- ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
- nodeConfig.getBootstrapServers());
+ Properties props = defaultKafkaProperties();
+ props.putAll(cacheClusterConfig.getParams());
+ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
PartitionerSelector.class.getName());
+ props.put(ProducerConfig.ACKS_CONFIG,
+
cacheClusterConfig.getParams().getOrDefault(ProducerConfig.ACKS_CONFIG, "all"));
+
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+
cacheClusterConfig.getParams().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+
props.put(ProducerConfig.CLIENT_ID_CONFIG,
- nodeConfig.getClientId() + "-" + workerName);
- LOG.info("init kafka client info: " + props);
+
cacheClusterConfig.getParams().get(ProducerConfig.CLIENT_ID_CONFIG) + "-" +
workerName);
+ LOG.info("init kafka client by cache cluster info: " + props);
+ producer = new KafkaProducer<>(props, new StringSerializer(), new
ByteArraySerializer());
+ Preconditions.checkNotNull(producer);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private void startByNodeConfig() {
+ this.state = LifecycleState.START;
+ if (nodeConfig == null) {
+ LOG.error("start kafka producer cluster failed, node config is
null");
+ return;
+ }
+ try {
+ Properties props = defaultKafkaProperties();
+ props.putAll(nodeConfig.getProperties() == null ? new HashMap<>()
: nodeConfig.getProperties());
+ props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
PartitionerSelector.class.getName());
+ props.put(ProducerConfig.ACKS_CONFIG, nodeConfig.getAcks());
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
nodeConfig.getBootstrapServers());
+ props.put(ProducerConfig.CLIENT_ID_CONFIG,
nodeConfig.getClientId() + "-" + workerName);
+ LOG.info("init kafka client by node config info: " + props);
producer = new KafkaProducer<>(props, new StringSerializer(), new
ByteArraySerializer());
Preconditions.checkNotNull(producer);
} catch (Exception e) {
@@ -93,6 +124,35 @@ public class KafkaProducerCluster implements LifecycleAware
{
}
}
+ public Properties defaultKafkaProperties() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
+ props.put(ProducerConfig.BATCH_SIZE_CONFIG, "122880");
+ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "44740000");
+ props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
+ props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "86400000");
+ props.put(ProducerConfig.LINGER_MS_CONFIG, "500");
+ props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
+ props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "8388608");
+ props.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "300000");
+ props.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, "32768");
+ props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000");
+ props.put(ProducerConfig.RETRIES_CONFIG, "100000");
+ props.put(ProducerConfig.SEND_BUFFER_CONFIG, "524288");
+ props.put("mute.partition.error.max.times", "20");
+ props.put("mute.partition.max.percentage", "20");
+ props.put("rpc.timeout.ms", "30000");
+ props.put("topic.expiry.ms", "86400000");
+ props.put("unmute.partition.interval.ms", "600000");
+ props.put("metadata.retry.backoff.ms", "500");
+ props.put("metadata.fetch.timeout.ms", "1000");
+ props.put("maxThreads", "2");
+ props.put("enable.replace.partition.for.can.retry", "true");
+ props.put("enable.replace.partition.for.not.leader", "true");
+ props.put("enable.topic.partition.circuit.breaker", "true");
+ return props;
+ }
+
/** stop and close kafka producer */
@Override
public void stop() {
@@ -159,12 +219,4 @@ public class KafkaProducerCluster implements
LifecycleAware {
}
}
- /**
- * get cache cluster name
- *
- * @return cacheClusterName
- */
- public String getCacheClusterName() {
- return cacheClusterName;
- }
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
index 23e817dd8d..219519b907 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaProducerFederation.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.kafka;
import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import com.google.common.base.Preconditions;
@@ -45,6 +47,7 @@ public class KafkaProducerFederation implements Runnable {
private KafkaNodeConfig nodeConfig;
private KafkaProducerCluster cluster;
private KafkaProducerCluster deleteCluster;
+ private CacheClusterConfig cacheClusterConfig;
public KafkaProducerFederation(String workerName,
KafkaFederationSinkContext context) {
this.workerName = Preconditions.checkNotNull(workerName);
@@ -86,13 +89,39 @@ public class KafkaProducerFederation implements Runnable {
LOG.error("failed to close delete cluster, ex={}", e.getMessage(),
e);
}
+ if (CommonPropertiesHolder.useUnifiedConfiguration()) {
+ reloadByNodeConfig();
+ } else {
+ reloadByCacheClusterConfig();
+ }
+
+ }
+
+ private void reloadByCacheClusterConfig() {
try {
+ if (cacheClusterConfig != null &&
!cacheClusterConfig.equals(context.getCacheClusterConfig())) {
+ return;
+ }
+ this.cacheClusterConfig = context.getCacheClusterConfig();
+ KafkaProducerCluster updateCluster =
+ new KafkaProducerCluster(workerName, cacheClusterConfig,
nodeConfig, context);
+ updateCluster.start();
+ this.deleteCluster = cluster;
+ this.cluster = updateCluster;
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private void reloadByNodeConfig() {
+ try {
if (nodeConfig != null && context.getNodeConfig().getVersion() <=
nodeConfig.getVersion()) {
return;
}
this.nodeConfig = context.getNodeConfig();
- KafkaProducerCluster updateCluster = new
KafkaProducerCluster(workerName, nodeConfig, context);
+ KafkaProducerCluster updateCluster =
+ new KafkaProducerCluster(workerName, cacheClusterConfig,
nodeConfig, context);
updateCluster.start();
this.deleteCluster = cluster;
this.cluster = updateCluster;
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index f5fe9c5b96..cce771675f 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -25,6 +25,7 @@ import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
@@ -50,6 +51,7 @@ public class PulsarFederationSinkContext extends SinkContext {
public static final String KEY_EVENT_HANDLER = "eventHandler";
private Map<String, PulsarIdConfig> idConfigMap = new
ConcurrentHashMap<>();
private PulsarNodeConfig pulsarNodeConfig;
+ private CacheClusterConfig cacheClusterConfig;
public PulsarFederationSinkContext(String sinkName, Context context,
Channel channel) {
super(sinkName, context, channel);
@@ -73,6 +75,12 @@ public class PulsarFederationSinkContext extends SinkContext
{
if (pulsarNodeConfig == null || requestNodeConfig.getVersion() >
pulsarNodeConfig.getVersion()) {
this.pulsarNodeConfig = requestNodeConfig;
}
+
+ CacheClusterConfig clusterConfig = new CacheClusterConfig();
+ clusterConfig.setClusterName(this.taskName);
+ clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
+ this.cacheClusterConfig = clusterConfig;
+
this.taskConfig = newTaskConfig;
this.sortTaskConfig = newSortTaskConfig;