This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e4e24def90 [INLONG-10228][Sort] PulsarSink support unified
configuration (#10236)
e4e24def90 is described below
commit e4e24def90281d94d0d708af80ba32267e9fd313
Author: vernedeng <[email protected]>
AuthorDate: Mon May 20 11:35:15 2024 +0800
[INLONG-10228][Sort] PulsarSink support unified configuration (#10236)
---
.../config/holder/v2/SortConfigHolder.java | 3 +-
.../sink/pulsar/PulsarFederationSinkContext.java | 105 ++++----------
.../sink/pulsar/PulsarFederationWorker.java | 29 ----
.../standalone/sink/pulsar/PulsarIdConfig.java | 151 +++------------------
.../sink/pulsar/PulsarProducerCluster.java | 36 +++--
.../sink/pulsar/PulsarProducerFederation.java | 85 +++---------
6 files changed, 84 insertions(+), 325 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index 272df1cc19..f1c52d2fc4 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -133,7 +133,8 @@ public class SortConfigHolder {
.filter(flow ->
StringUtils.isNotEmpty(flow.getAuditTag()))
.collect(Collectors.toMap(flow ->
InlongId.generateUid(flow.getInlongGroupId(),
flow.getInlongStreamId()),
- DataFlowConfig::getAuditTag))));
+ DataFlowConfig::getAuditTag,
+ (flow1, flow2) -> flow1))));
this.config = newConfig;
} catch (Throwable e) {
log.error("failed to reload sort config", e);
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 63b2856b81..e5ed8985b2 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -17,14 +17,16 @@
package org.apache.inlong.sort.standalone.sink.pulsar;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.commons.lang3.ClassUtils;
@@ -32,42 +34,26 @@ import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.slf4j.Logger;
-import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
-/**
- *
- * PulsarFederationSinkContext
- */
public class PulsarFederationSinkContext extends SinkContext {
public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarFederationSinkContext.class);
public static final String KEY_EVENT_HANDLER = "eventHandler";
-
- private Context producerContext;
private Map<String, PulsarIdConfig> idConfigMap = new
ConcurrentHashMap<>();
- private List<CacheClusterConfig> clusterConfigList = new ArrayList<>();
+ private PulsarNodeConfig pulsarNodeConfig;
- /**
- * Constructor
- *
- * @param sinkName
- * @param context
- * @param channel
- */
public PulsarFederationSinkContext(String sinkName, Context context,
Channel channel) {
super(sinkName, context, channel);
}
- /**
- * reload
- */
public void reload() {
try {
- SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ SortTaskConfig newSortTaskConfig =
SortConfigHolder.getTaskConfig(taskName);
if (newSortTaskConfig == null) {
LOG.error("newSortTaskConfig is null.");
return;
@@ -77,50 +63,26 @@ public class PulsarFederationSinkContext extends
SinkContext {
return;
}
this.sortTaskConfig = newSortTaskConfig;
- this.producerContext = new
Context(this.sortTaskConfig.getSinkParams());
- // parse the config of id and topic
- LOG.info("reload idTopicMap");
- Map<String, PulsarIdConfig> newIdConfigMap = new
ConcurrentHashMap<>();
- List<Map<String, String>> idList =
this.sortTaskConfig.getIdParams();
- for (Map<String, String> idParam : idList) {
- try {
- PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
- newIdConfigMap.put(idConfig.getUid(), idConfig);
- } catch (Exception e) {
- LOG.error("fail to parse pulsar id config", e);
- }
+ PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig)
newSortTaskConfig.getNodeConfig();
+ if (pulsarNodeConfig == null || requestNodeConfig.getVersion() >
pulsarNodeConfig.getVersion()) {
+ this.pulsarNodeConfig = requestNodeConfig;
}
- // build cache cluster config
- CacheClusterConfig clusterConfig = new CacheClusterConfig();
- clusterConfig.setClusterName(this.taskName);
- clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
- List<CacheClusterConfig> newClusterConfigList = new ArrayList<>();
- newClusterConfigList.add(clusterConfig);
- // change current config
- LOG.info("old id config map={}\n new id config map={}",
idConfigMap, newIdConfigMap);
- this.idConfigMap = newIdConfigMap;
- this.clusterConfigList = newClusterConfigList;
+
+ this.idConfigMap = this.sortTaskConfig.getClusters()
+ .stream()
+ .map(SortClusterConfig::getDataFlowConfigs)
+ .flatMap(Collection::stream)
+ .map(PulsarIdConfig::create)
+ .collect(Collectors.toMap(
+ config ->
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+ v -> v,
+ (v1, v2) -> v1));
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
- /**
- * get producerContext
- *
- * @return the producerContext
- */
- public Context getProducerContext() {
- return producerContext;
- }
-
- /**
- * get Topic by uid
- *
- * @param uid uid
- * @return topic
- */
public String getTopic(String uid) {
PulsarIdConfig idConfig = this.idConfigMap.get(uid);
if (idConfig == null) {
@@ -129,12 +91,6 @@ public class PulsarFederationSinkContext extends
SinkContext {
return idConfig.getTopic();
}
- /**
- * get PulsarIdConfig by uid
- *
- * @param uid uid
- * @return KafkaIdConfig
- */
public PulsarIdConfig getIdConfig(String uid) {
PulsarIdConfig idConfig = this.idConfigMap.get(uid);
if (idConfig == null) {
@@ -143,21 +99,10 @@ public class PulsarFederationSinkContext extends
SinkContext {
return idConfig;
}
- /**
- * getCacheClusters
- *
- * @return
- */
- public List<CacheClusterConfig> getCacheClusters() {
- return this.clusterConfigList;
+ public PulsarNodeConfig getNodeConfig() {
+ return pulsarNodeConfig;
}
- /**
- * addSendMetric
- *
- * @param currentRecord
- * @param topic
- */
public void addSendMetric(ProfileEvent currentRecord, String topic) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
index 7fb35e6869..8044f7ad70 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
@@ -28,10 +28,6 @@ import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
-/**
- *
- * PulsarFederationWorker
- */
public class PulsarFederationWorker extends Thread {
public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarFederationWorker.class);
@@ -42,13 +38,6 @@ public class PulsarFederationWorker extends Thread {
private PulsarProducerFederation producerFederation;
private LifecycleState status;
- /**
- * Constructor
- *
- * @param sinkName
- * @param workerIndex
- * @param context
- */
public PulsarFederationWorker(String sinkName, int workerIndex,
PulsarFederationSinkContext context) {
super();
this.workerName = sinkName + "-worker-" + workerIndex;
@@ -57,9 +46,6 @@ public class PulsarFederationWorker extends Thread {
this.status = LifecycleState.IDLE;
}
- /**
- * start
- */
@Override
public void start() {
this.producerFederation.start();
@@ -67,19 +53,12 @@ public class PulsarFederationWorker extends Thread {
super.start();
}
- /**
- *
- * close
- */
public void close() {
// close all producers
this.producerFederation.close();
this.status = LifecycleState.STOP;
}
- /**
- * run
- */
@Override
public void run() {
LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
@@ -123,11 +102,6 @@ public class PulsarFederationWorker extends Thread {
}
}
- /**
- * fillTopic
- *
- * @param currentRecord
- */
private String fillTopic(ProfileEvent currentRecord) {
String topic = this.context.getTopic(currentRecord.getUid());
if (!StringUtils.isBlank(topic)) {
@@ -137,9 +111,6 @@ public class PulsarFederationWorker extends Thread {
return "-";
}
- /**
- * sleepOneInterval
- */
private void sleepOneInterval() {
try {
Thread.sleep(context.getProcessInterval());
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index 4ef3aece80..cb6c651982 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -18,15 +18,19 @@
package org.apache.inlong.sort.standalone.sink.pulsar;
import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.utils.Constants;
-import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-/**
- *
- * KafkaIdConfig
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class PulsarIdConfig {
public static final String KEY_DATA_TYPE = "dataType";
@@ -42,134 +46,17 @@ public class PulsarIdConfig {
private String topic;
private DataTypeEnum dataType = DataTypeEnum.TEXT;
- /**
- * Constructor
- */
- public PulsarIdConfig() {
-
- }
-
- /**
- * Constructor
- *
- * @param idParam
- */
- public PulsarIdConfig(Map<String, String> idParam) {
- this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
- this.inlongStreamId = idParam.getOrDefault(Constants.INLONG_STREAM_ID,
DEFAULT_INLONG_STREAM);
- this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
- this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR,
PulsarIdConfig.DEFAULT_SEPARATOR);
- this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
- this.dataType = DataTypeEnum
- .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE,
DataTypeEnum.TEXT.getType()));
- }
-
- /**
- * get inlongGroupId
- *
- * @return the inlongGroupId
- */
- public String getInlongGroupId() {
- return inlongGroupId;
- }
-
- /**
- * set inlongGroupId
- *
- * @param inlongGroupId the inlongGroupId to set
- */
- public void setInlongGroupId(String inlongGroupId) {
- this.inlongGroupId = inlongGroupId;
- }
-
- /**
- * get inlongStreamId
- *
- * @return the inlongStreamId
- */
- public String getInlongStreamId() {
- return inlongStreamId;
- }
-
- /**
- * set inlongStreamId
- *
- * @param inlongStreamId the inlongStreamId to set
- */
- public void setInlongStreamId(String inlongStreamId) {
- this.inlongStreamId = inlongStreamId;
- }
+ public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) {
+ PulsarSinkConfig sinkConfig = (PulsarSinkConfig)
dataFlowConfig.getSinkConfig();
- /**
- * get uid
- *
- * @return the uid
- */
- public String getUid() {
- return uid;
- }
-
- /**
- * set uid
- *
- * @param uid the uid to set
- */
- public void setUid(String uid) {
- this.uid = uid;
- }
-
- /**
- * get separator
- *
- * @return the separator
- */
- public String getSeparator() {
- return separator;
- }
-
- /**
- * set separator
- *
- * @param separator the separator to set
- */
- public void setSeparator(String separator) {
- this.separator = separator;
- }
-
- /**
- * get topic
- *
- * @return the topic
- */
- public String getTopic() {
- return topic;
- }
-
- /**
- * set topic
- *
- * @param topic the topic to set
- */
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- /**
- * get dataType
- *
- * @return the dataType
- */
- public DataTypeEnum getDataType() {
- return dataType;
- }
+ return PulsarIdConfig.builder()
+ .inlongGroupId(dataFlowConfig.getInlongGroupId())
+ .inlongStreamId(dataFlowConfig.getInlongStreamId())
+ .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(),
dataFlowConfig.getInlongStreamId()))
+ .topic(sinkConfig.getTopic())
+ .dataType(DataTypeEnum.TEXT)
+ .build();
- /**
- * set dataType
- *
- * @param dataType the dataType to set
- */
- public void setDataType(DataTypeEnum dataType) {
- this.dataType = dataType;
}
}
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index b54dd44281..1ae34191ec 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -17,12 +17,12 @@
package org.apache.inlong.sort.standalone.sink.pulsar;
+import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-import org.apache.commons.lang.math.NumberUtils;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
@@ -73,8 +73,8 @@ public class PulsarProducerCluster implements LifecycleAware {
+ "BatchingPartitionSwitchFrequency";
private final String workerName;
- private final CacheClusterConfig config;
private final PulsarFederationSinkContext sinkContext;
+ private final PulsarNodeConfig nodeConfig;
private final Context context;
private final String cacheClusterName;
private LifecycleState state;
@@ -88,20 +88,13 @@ public class PulsarProducerCluster implements
LifecycleAware {
private Map<String, Producer<byte[]>> producerMap = new
ConcurrentHashMap<>();
- /**
- * Constructor
- *
- * @param workerName
- * @param config
- * @param context
- */
- public PulsarProducerCluster(String workerName, CacheClusterConfig config,
PulsarFederationSinkContext context) {
+ public PulsarProducerCluster(String workerName, PulsarNodeConfig
nodeConfig, PulsarFederationSinkContext context) {
this.workerName = workerName;
- this.config = config;
this.sinkContext = context;
- this.context = context.getProducerContext();
+ this.nodeConfig = nodeConfig;
+ this.context = new Context(nodeConfig.getProperties() != null ?
nodeConfig.getProperties() : Maps.newHashMap());
this.state = LifecycleState.IDLE;
- this.cacheClusterName = config.getClusterName();
+ this.cacheClusterName = nodeConfig.getNodeName();
this.handler = sinkContext.createEventHandler();
}
@@ -114,20 +107,19 @@ public class PulsarProducerCluster implements
LifecycleAware {
try {
// create pulsar client
ClientBuilder clientBuilder = PulsarClient.builder();
- String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
+ String serviceUrl = nodeConfig.getServiceUrl();
if (StringUtils.isBlank(serviceUrl)) {
throw new IllegalArgumentException("service url should not be
null");
}
clientBuilder.serviceUrl(serviceUrl);
- String authentication = config.getParams().get(KEY_AUTHENTICATION);
+ String authentication = nodeConfig.getToken();
if (StringUtils.isNoneBlank(authentication)) {
clientBuilder.authentication(AuthenticationFactory.token(authentication));
}
this.client = clientBuilder
-
.statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS),
-1),
- TimeUnit.SECONDS)
+ .statsInterval(context.getLong(KEY_STATS_INTERVAL_SECONDS,
-1L), TimeUnit.SECONDS)
.build();
// create producer template
@@ -159,8 +151,12 @@ public class PulsarProducerCluster implements
LifecycleAware {
* @return CompressionType
*/
private CompressionType getPulsarCompressionType() {
- String type = this.context.getString(KEY_COMPRESSIONTYPE,
DEFAULT_COMPRESS_TYPE);
- switch (type) {
+ String type = nodeConfig.getCompressionType();
+ if (type == null) {
+ return CompressionType.ZLIB;
+ }
+
+ switch (type.toUpperCase()) {
case "LZ4":
return CompressionType.LZ4;
case "NONE":
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
index 709df5ffa6..fd4a0b95b5 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
@@ -17,22 +17,17 @@
package org.apache.inlong.sort.standalone.sink.pulsar;
+import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.flume.Transaction;
import org.slf4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
/**
*
@@ -45,11 +40,9 @@ public class PulsarProducerFederation {
private final String workerName;
private final PulsarFederationSinkContext context;
private Timer reloadTimer;
-
- private List<PulsarProducerCluster> clusterList = new ArrayList<>();
- private List<PulsarProducerCluster> deletingClusterList = new
ArrayList<>();
-
- private AtomicInteger clusterIndex = new AtomicInteger(0);
+ private PulsarNodeConfig nodeConfig;
+ private PulsarProducerCluster cluster;
+ private PulsarProducerCluster deleteCluster;
/**
* Constructor
@@ -83,9 +76,7 @@ public class PulsarProducerFederation {
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
- for (PulsarProducerCluster cluster : this.clusterList) {
- cluster.stop();
- }
+ cluster.stop();
}
/**
@@ -108,62 +99,30 @@ public class PulsarProducerFederation {
*/
public void reload() {
try {
- // stop deleted cluster
- deletingClusterList.forEach(item -> {
- item.stop();
- });
- deletingClusterList.clear();
- // update cluster list
- List<CacheClusterConfig> configList =
this.context.getCacheClusters();
- List<PulsarProducerCluster> newClusterList = new
ArrayList<>(configList.size());
- // prepare
- Set<String> newClusterNames = new HashSet<>();
- configList.forEach(item -> {
- newClusterNames.add(item.getClusterName());
- });
- Set<String> oldClusterNames = new HashSet<>();
- clusterList.forEach(item -> {
- oldClusterNames.add(item.getCacheClusterName());
- });
- // add
- for (CacheClusterConfig config : configList) {
- if (!oldClusterNames.contains(config.getClusterName())) {
- PulsarProducerCluster cluster = new
PulsarProducerCluster(workerName, config, context);
- cluster.start();
- newClusterList.add(cluster);
- }
+ if (deleteCluster != null) {
+ deleteCluster.stop();
+ deleteCluster = null;
}
- // remove
- for (PulsarProducerCluster cluster : this.clusterList) {
- if (newClusterNames.contains(cluster.getCacheClusterName())) {
- newClusterList.add(cluster);
- } else {
- deletingClusterList.add(cluster);
- }
+
+ } catch (Exception e) {
+ LOG.error("failed to close delete cluster, ex={}", e.getMessage(),
e);
+ }
+
+ try {
+ if (nodeConfig != null && context.getNodeConfig().getVersion() <=
nodeConfig.getVersion()) {
+ return;
}
- this.clusterList = newClusterList;
+ this.nodeConfig = context.getNodeConfig();
+ PulsarProducerCluster updateCluster = new
PulsarProducerCluster(workerName, nodeConfig, context);
+ updateCluster.start();
+ this.deleteCluster = cluster;
+ this.cluster = updateCluster;
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
}
}
- /**
- * send
- *
- * @param profileEvent
- * @param tx
- * @return boolean
- * @throws IOException
- */
public boolean send(ProfileEvent profileEvent, Transaction tx) throws
IOException {
- int currentIndex = clusterIndex.getAndIncrement();
- if (currentIndex > Integer.MAX_VALUE / 2) {
- clusterIndex.set(0);
- }
- List<PulsarProducerCluster> currentClusterList = this.clusterList;
- int currentSize = currentClusterList.size();
- int realIndex = currentIndex % currentSize;
- PulsarProducerCluster clusterProducer =
currentClusterList.get(realIndex);
- return clusterProducer.send(profileEvent, tx);
+ return cluster.send(profileEvent, tx);
}
}