This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d303033ad0 [INLONG-10597][Sort] Provide default pulsar producer
configuration (#10600)
d303033ad0 is described below
commit d303033ad00f4b480c42be8bfb9bc96128bcfbac
Author: vernedeng <[email protected]>
AuthorDate: Wed Jul 10 19:22:07 2024 +0800
[INLONG-10597][Sort] Provide default pulsar producer configuration (#10600)
* [INLONG-10597][Sort] Provide default pulsar producer configuration
---------
Co-authored-by: vernedeng <[email protected]>
---
.../sink/pulsar/PulsarFederationSinkContext.java | 10 ++-
.../sink/pulsar/PulsarProducerCluster.java | 80 ++++++++++++++--------
.../sink/pulsar/PulsarProducerFederation.java | 31 ++++++++-
3 files changed, 89 insertions(+), 32 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index cce771675f..a686d50214 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -76,14 +76,14 @@ public class PulsarFederationSinkContext extends
SinkContext {
this.pulsarNodeConfig = requestNodeConfig;
}
+ this.taskConfig = newTaskConfig;
+ this.sortTaskConfig = newSortTaskConfig;
+
CacheClusterConfig clusterConfig = new CacheClusterConfig();
clusterConfig.setClusterName(this.taskName);
clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
this.cacheClusterConfig = clusterConfig;
- this.taskConfig = newTaskConfig;
- this.sortTaskConfig = newSortTaskConfig;
-
Map<String, PulsarIdConfig> fromTaskConfig =
fromTaskConfig(taskConfig);
Map<String, PulsarIdConfig> fromSortTaskConfig =
fromSortTaskConfig(sortTaskConfig);
SortConfigMetricReporter.reportClusterDiff(clusterId, taskName,
fromTaskConfig, fromSortTaskConfig);
@@ -139,6 +139,10 @@ public class PulsarFederationSinkContext extends
SinkContext {
return pulsarNodeConfig;
}
+ public CacheClusterConfig getCacheClusterConfig() {
+ return cacheClusterConfig;
+ }
+
public void addSendMetric(ProfileEvent currentRecord, String topic) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 1ae34191ec..4f2f80e0ab 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -19,10 +19,11 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
@@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
@@ -75,26 +77,31 @@ public class PulsarProducerCluster implements
LifecycleAware {
private final String workerName;
private final PulsarFederationSinkContext sinkContext;
private final PulsarNodeConfig nodeConfig;
- private final Context context;
- private final String cacheClusterName;
+ private final CacheClusterConfig cacheClusterConfig;
+ private String cacheClusterName;
+ private Context context;
private LifecycleState state;
private IEvent2PulsarRecordHandler handler;
/**
* pulsar client
*/
+ private ClientBuilder clientBuilder;
private PulsarClient client;
private ProducerBuilder<byte[]> baseBuilder;
private Map<String, Producer<byte[]>> producerMap = new
ConcurrentHashMap<>();
- public PulsarProducerCluster(String workerName, PulsarNodeConfig
nodeConfig, PulsarFederationSinkContext context) {
+ public PulsarProducerCluster(
+ String workerName,
+ CacheClusterConfig cacheClusterConfig,
+ PulsarNodeConfig nodeConfig,
+ PulsarFederationSinkContext context) {
this.workerName = workerName;
this.sinkContext = context;
this.nodeConfig = nodeConfig;
- this.context = new Context(nodeConfig.getProperties() != null ?
nodeConfig.getProperties() : Maps.newHashMap());
+ this.cacheClusterConfig = cacheClusterConfig;
this.state = LifecycleState.IDLE;
- this.cacheClusterName = nodeConfig.getNodeName();
this.handler = sinkContext.createEventHandler();
}
@@ -106,16 +113,10 @@ public class PulsarProducerCluster implements
LifecycleAware {
this.state = LifecycleState.START;
try {
// create pulsar client
- ClientBuilder clientBuilder = PulsarClient.builder();
- String serviceUrl = nodeConfig.getServiceUrl();
- if (StringUtils.isBlank(serviceUrl)) {
- throw new IllegalArgumentException("service url should not be
null");
- }
-
- clientBuilder.serviceUrl(serviceUrl);
- String authentication = nodeConfig.getToken();
- if (StringUtils.isNoneBlank(authentication)) {
-
clientBuilder.authentication(AuthenticationFactory.token(authentication));
+ if (CommonPropertiesHolder.useUnifiedConfiguration()) {
+ initBuilderByNodeConfig(nodeConfig);
+ } else {
+ initBuilderByCacheCluster(cacheClusterConfig);
}
this.client = clientBuilder
@@ -135,7 +136,7 @@ public class PulsarProducerCluster implements
LifecycleAware {
.maxPendingMessagesAcrossPartitions(
context.getInteger(KEY_MAXPENDINGMESSAGESACROSSPARTITIONS, 50000))
.sendTimeout(context.getInteger(KEY_SENDTIMEOUT, 0),
TimeUnit.MILLISECONDS)
- .compressionType(this.getPulsarCompressionType())
+
.compressionType(this.getPulsarCompressionType(context.getString(KEY_COMPRESSIONTYPE,
"ZLIB")))
.blockIfQueueFull(context.getBoolean(KEY_BLOCKIFQUEUEFULL,
true))
.roundRobinRouterBatchingPartitionSwitchFrequency(
context.getInteger(KEY_ROUNDROBINROUTERBATCHINGPARTITIONSWITCHFREQUENCY, 10))
@@ -145,13 +146,45 @@ public class PulsarProducerCluster implements
LifecycleAware {
}
}
+ private void initBuilderByCacheCluster(CacheClusterConfig
cacheClusterConfig) {
+ this.cacheClusterName = cacheClusterConfig.getClusterName();
+ this.context = new Context(cacheClusterConfig.getParams());
+ clientBuilder = PulsarClient.builder();
+ String serviceUrl =
cacheClusterConfig.getParams().get(KEY_SERVICE_URL);
+ if (StringUtils.isBlank(serviceUrl)) {
+ throw new IllegalArgumentException("service url should not be
null");
+ }
+
+ clientBuilder.serviceUrl(serviceUrl);
+ String authentication =
cacheClusterConfig.getParams().get(KEY_AUTHENTICATION);
+ if (StringUtils.isNoneBlank(authentication)) {
+
clientBuilder.authentication(AuthenticationFactory.token(authentication));
+ }
+ }
+
+ private void initBuilderByNodeConfig(PulsarNodeConfig nodeConfig) {
+ this.cacheClusterName = nodeConfig.getNodeName();
+ this.context = new Context(nodeConfig.getProperties() == null ? new
HashMap<>() : nodeConfig.getProperties());
+
+ clientBuilder = PulsarClient.builder();
+ String serviceUrl = nodeConfig.getServiceUrl();
+ if (StringUtils.isBlank(serviceUrl)) {
+ throw new IllegalArgumentException("service url should not be
null");
+ }
+
+ clientBuilder.serviceUrl(serviceUrl);
+ String authentication = nodeConfig.getToken();
+ if (StringUtils.isNoneBlank(authentication)) {
+
clientBuilder.authentication(AuthenticationFactory.token(authentication));
+ }
+ }
+
/**
* getPulsarCompressionType
*
* @return CompressionType
*/
- private CompressionType getPulsarCompressionType() {
- String type = nodeConfig.getCompressionType();
+ private CompressionType getPulsarCompressionType(String type) {
if (type == null) {
return CompressionType.ZLIB;
}
@@ -275,13 +308,4 @@ public class PulsarProducerCluster implements
LifecycleAware {
return true;
}
- /**
- * get cacheClusterName
- *
- * @return the cacheClusterName
- */
- public String getCacheClusterName() {
- return cacheClusterName;
- }
-
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
index fd4a0b95b5..d12fddb123 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
@@ -19,6 +19,8 @@ package org.apache.inlong.sort.standalone.sink.pulsar;
import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.flume.Transaction;
@@ -41,6 +43,7 @@ public class PulsarProducerFederation {
private final PulsarFederationSinkContext context;
private Timer reloadTimer;
private PulsarNodeConfig nodeConfig;
+ private CacheClusterConfig cacheClusterConfig;
private PulsarProducerCluster cluster;
private PulsarProducerCluster deleteCluster;
@@ -108,12 +111,38 @@ public class PulsarProducerFederation {
LOG.error("failed to close delete cluster, ex={}", e.getMessage(),
e);
}
+ if (CommonPropertiesHolder.useUnifiedConfiguration()) {
+ reloadByNodeConfig();
+ } else {
+ reloadByCacheClusterConfig();
+ }
+
+ }
+
+ private void reloadByNodeConfig() {
try {
if (nodeConfig != null && context.getNodeConfig().getVersion() <=
nodeConfig.getVersion()) {
return;
}
this.nodeConfig = context.getNodeConfig();
- PulsarProducerCluster updateCluster = new
PulsarProducerCluster(workerName, nodeConfig, context);
+ PulsarProducerCluster updateCluster =
+ new PulsarProducerCluster(workerName, cacheClusterConfig,
nodeConfig, context);
+ updateCluster.start();
+ this.deleteCluster = cluster;
+ this.cluster = updateCluster;
+ } catch (Throwable e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ private void reloadByCacheClusterConfig() {
+ try {
+ if (cacheClusterConfig != null &&
!cacheClusterConfig.equals(context.getCacheClusterConfig())) {
+ return;
+ }
+ this.cacheClusterConfig = context.getCacheClusterConfig();
+ PulsarProducerCluster updateCluster =
+ new PulsarProducerCluster(workerName, cacheClusterConfig,
nodeConfig, context);
updateCluster.start();
this.deleteCluster = cluster;
this.cluster = updateCluster;