This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e4e24def90 [INLONG-10228][Sort] PulsarSink support unified 
configuration (#10236)
e4e24def90 is described below

commit e4e24def90281d94d0d708af80ba32267e9fd313
Author: vernedeng <[email protected]>
AuthorDate: Mon May 20 11:35:15 2024 +0800

    [INLONG-10228][Sort] PulsarSink support unified configuration (#10236)
---
 .../config/holder/v2/SortConfigHolder.java         |   3 +-
 .../sink/pulsar/PulsarFederationSinkContext.java   | 105 ++++----------
 .../sink/pulsar/PulsarFederationWorker.java        |  29 ----
 .../standalone/sink/pulsar/PulsarIdConfig.java     | 151 +++------------------
 .../sink/pulsar/PulsarProducerCluster.java         |  36 +++--
 .../sink/pulsar/PulsarProducerFederation.java      |  85 +++---------
 6 files changed, 84 insertions(+), 325 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index 272df1cc19..f1c52d2fc4 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -133,7 +133,8 @@ public class SortConfigHolder {
                                     .filter(flow -> 
StringUtils.isNotEmpty(flow.getAuditTag()))
                                     .collect(Collectors.toMap(flow -> 
InlongId.generateUid(flow.getInlongGroupId(),
                                             flow.getInlongStreamId()),
-                                            DataFlowConfig::getAuditTag))));
+                                            DataFlowConfig::getAuditTag,
+                                            (flow1, flow2) -> flow1))));
             this.config = newConfig;
         } catch (Throwable e) {
             log.error("failed to reload sort config", e);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 63b2856b81..e5ed8985b2 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -17,14 +17,16 @@
 
 package org.apache.inlong.sort.standalone.sink.pulsar;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.commons.lang3.ClassUtils;
@@ -32,42 +34,26 @@ import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
-/**
- *
- * PulsarFederationSinkContext
- */
 public class PulsarFederationSinkContext extends SinkContext {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarFederationSinkContext.class);
     public static final String KEY_EVENT_HANDLER = "eventHandler";
-
-    private Context producerContext;
     private Map<String, PulsarIdConfig> idConfigMap = new 
ConcurrentHashMap<>();
-    private List<CacheClusterConfig> clusterConfigList = new ArrayList<>();
+    private PulsarNodeConfig pulsarNodeConfig;
 
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param context
-     * @param channel
-     */
     public PulsarFederationSinkContext(String sinkName, Context context, 
Channel channel) {
         super(sinkName, context, channel);
     }
 
-    /**
-     * reload
-     */
     public void reload() {
         try {
-            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             if (newSortTaskConfig == null) {
                 LOG.error("newSortTaskConfig is null.");
                 return;
@@ -77,50 +63,26 @@ public class PulsarFederationSinkContext extends 
SinkContext {
                 return;
             }
             this.sortTaskConfig = newSortTaskConfig;
-            this.producerContext = new 
Context(this.sortTaskConfig.getSinkParams());
 
-            // parse the config of id and topic
-            LOG.info("reload idTopicMap");
-            Map<String, PulsarIdConfig> newIdConfigMap = new 
ConcurrentHashMap<>();
-            List<Map<String, String>> idList = 
this.sortTaskConfig.getIdParams();
-            for (Map<String, String> idParam : idList) {
-                try {
-                    PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
-                    newIdConfigMap.put(idConfig.getUid(), idConfig);
-                } catch (Exception e) {
-                    LOG.error("fail to parse pulsar id config", e);
-                }
+            PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) 
newSortTaskConfig.getNodeConfig();
+            if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > 
pulsarNodeConfig.getVersion()) {
+                this.pulsarNodeConfig = requestNodeConfig;
             }
-            // build cache cluster config
-            CacheClusterConfig clusterConfig = new CacheClusterConfig();
-            clusterConfig.setClusterName(this.taskName);
-            clusterConfig.setParams(this.sortTaskConfig.getSinkParams());
-            List<CacheClusterConfig> newClusterConfigList = new ArrayList<>();
-            newClusterConfigList.add(clusterConfig);
-            // change current config
-            LOG.info("old id config map={}\n new id config map={}", 
idConfigMap, newIdConfigMap);
-            this.idConfigMap = newIdConfigMap;
-            this.clusterConfigList = newClusterConfigList;
+
+            this.idConfigMap = this.sortTaskConfig.getClusters()
+                    .stream()
+                    .map(SortClusterConfig::getDataFlowConfigs)
+                    .flatMap(Collection::stream)
+                    .map(PulsarIdConfig::create)
+                    .collect(Collectors.toMap(
+                            config -> 
InlongId.generateUid(config.getInlongGroupId(), config.getInlongStreamId()),
+                            v -> v,
+                            (v1, v2) -> v1));
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
-    /**
-     * get producerContext
-     *
-     * @return the producerContext
-     */
-    public Context getProducerContext() {
-        return producerContext;
-    }
-
-    /**
-     * get Topic by uid
-     *
-     * @param  uid uid
-     * @return     topic
-     */
     public String getTopic(String uid) {
         PulsarIdConfig idConfig = this.idConfigMap.get(uid);
         if (idConfig == null) {
@@ -129,12 +91,6 @@ public class PulsarFederationSinkContext extends 
SinkContext {
         return idConfig.getTopic();
     }
 
-    /**
-     * get PulsarIdConfig by uid
-     *
-     * @param  uid uid
-     * @return     KafkaIdConfig
-     */
     public PulsarIdConfig getIdConfig(String uid) {
         PulsarIdConfig idConfig = this.idConfigMap.get(uid);
         if (idConfig == null) {
@@ -143,21 +99,10 @@ public class PulsarFederationSinkContext extends 
SinkContext {
         return idConfig;
     }
 
-    /**
-     * getCacheClusters
-     *
-     * @return
-     */
-    public List<CacheClusterConfig> getCacheClusters() {
-        return this.clusterConfigList;
+    public PulsarNodeConfig getNodeConfig() {
+        return pulsarNodeConfig;
     }
 
-    /**
-     * addSendMetric
-     *
-     * @param currentRecord
-     * @param topic
-     */
     public void addSendMetric(ProfileEvent currentRecord, String topic) {
         Map<String, String> dimensions = new HashMap<>();
         dimensions.put(SortMetricItem.KEY_CLUSTER_ID, this.getClusterId());
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
index 7fb35e6869..8044f7ad70 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationWorker.java
@@ -28,10 +28,6 @@ import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.slf4j.Logger;
 
-/**
- * 
- * PulsarFederationWorker
- */
 public class PulsarFederationWorker extends Thread {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarFederationWorker.class);
@@ -42,13 +38,6 @@ public class PulsarFederationWorker extends Thread {
     private PulsarProducerFederation producerFederation;
     private LifecycleState status;
 
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param workerIndex
-     * @param context
-     */
     public PulsarFederationWorker(String sinkName, int workerIndex, 
PulsarFederationSinkContext context) {
         super();
         this.workerName = sinkName + "-worker-" + workerIndex;
@@ -57,9 +46,6 @@ public class PulsarFederationWorker extends Thread {
         this.status = LifecycleState.IDLE;
     }
 
-    /**
-     * start
-     */
     @Override
     public void start() {
         this.producerFederation.start();
@@ -67,19 +53,12 @@ public class PulsarFederationWorker extends Thread {
         super.start();
     }
 
-    /**
-     * 
-     * close
-     */
     public void close() {
         // close all producers
         this.producerFederation.close();
         this.status = LifecycleState.STOP;
     }
 
-    /**
-     * run
-     */
     @Override
     public void run() {
         LOG.info(String.format("start PulsarSetWorker:%s", this.workerName));
@@ -123,11 +102,6 @@ public class PulsarFederationWorker extends Thread {
         }
     }
 
-    /**
-     * fillTopic
-     * 
-     * @param currentRecord
-     */
     private String fillTopic(ProfileEvent currentRecord) {
         String topic = this.context.getTopic(currentRecord.getUid());
         if (!StringUtils.isBlank(topic)) {
@@ -137,9 +111,6 @@ public class PulsarFederationWorker extends Thread {
         return "-";
     }
 
-    /**
-     * sleepOneInterval
-     */
     private void sleepOneInterval() {
         try {
             Thread.sleep(context.getProcessInterval());
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index 4ef3aece80..cb6c651982 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -18,15 +18,19 @@
 package org.apache.inlong.sort.standalone.sink.pulsar;
 
 import org.apache.inlong.common.enums.DataTypeEnum;
+import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.dataflow.sink.PulsarSinkConfig;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
-import org.apache.inlong.sort.standalone.utils.Constants;
 
-import java.util.Map;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-/**
- *
- * KafkaIdConfig
- */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class PulsarIdConfig {
 
     public static final String KEY_DATA_TYPE = "dataType";
@@ -42,134 +46,17 @@ public class PulsarIdConfig {
     private String topic;
     private DataTypeEnum dataType = DataTypeEnum.TEXT;
 
-    /**
-     * Constructor
-     */
-    public PulsarIdConfig() {
-
-    }
-
-    /**
-     * Constructor
-     *
-     * @param idParam
-     */
-    public PulsarIdConfig(Map<String, String> idParam) {
-        this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
-        this.inlongStreamId = idParam.getOrDefault(Constants.INLONG_STREAM_ID, 
DEFAULT_INLONG_STREAM);
-        this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
-        this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR, 
PulsarIdConfig.DEFAULT_SEPARATOR);
-        this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
-        this.dataType = DataTypeEnum
-                .convert(idParam.getOrDefault(PulsarIdConfig.KEY_DATA_TYPE, 
DataTypeEnum.TEXT.getType()));
-    }
-
-    /**
-     * get inlongGroupId
-     *
-     * @return the inlongGroupId
-     */
-    public String getInlongGroupId() {
-        return inlongGroupId;
-    }
-
-    /**
-     * set inlongGroupId
-     *
-     * @param inlongGroupId the inlongGroupId to set
-     */
-    public void setInlongGroupId(String inlongGroupId) {
-        this.inlongGroupId = inlongGroupId;
-    }
-
-    /**
-     * get inlongStreamId
-     *
-     * @return the inlongStreamId
-     */
-    public String getInlongStreamId() {
-        return inlongStreamId;
-    }
-
-    /**
-     * set inlongStreamId
-     *
-     * @param inlongStreamId the inlongStreamId to set
-     */
-    public void setInlongStreamId(String inlongStreamId) {
-        this.inlongStreamId = inlongStreamId;
-    }
+    public static PulsarIdConfig create(DataFlowConfig dataFlowConfig) {
+        PulsarSinkConfig sinkConfig = (PulsarSinkConfig) 
dataFlowConfig.getSinkConfig();
 
-    /**
-     * get uid
-     *
-     * @return the uid
-     */
-    public String getUid() {
-        return uid;
-    }
-
-    /**
-     * set uid
-     *
-     * @param uid the uid to set
-     */
-    public void setUid(String uid) {
-        this.uid = uid;
-    }
-
-    /**
-     * get separator
-     *
-     * @return the separator
-     */
-    public String getSeparator() {
-        return separator;
-    }
-
-    /**
-     * set separator
-     *
-     * @param separator the separator to set
-     */
-    public void setSeparator(String separator) {
-        this.separator = separator;
-    }
-
-    /**
-     * get topic
-     *
-     * @return the topic
-     */
-    public String getTopic() {
-        return topic;
-    }
-
-    /**
-     * set topic
-     *
-     * @param topic the topic to set
-     */
-    public void setTopic(String topic) {
-        this.topic = topic;
-    }
-
-    /**
-     * get dataType
-     *
-     * @return the dataType
-     */
-    public DataTypeEnum getDataType() {
-        return dataType;
-    }
+        return PulsarIdConfig.builder()
+                .inlongGroupId(dataFlowConfig.getInlongGroupId())
+                .inlongStreamId(dataFlowConfig.getInlongStreamId())
+                .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), 
dataFlowConfig.getInlongStreamId()))
+                .topic(sinkConfig.getTopic())
+                .dataType(DataTypeEnum.TEXT)
+                .build();
 
-    /**
-     * set dataType
-     *
-     * @param dataType the dataType to set
-     */
-    public void setDataType(DataTypeEnum dataType) {
-        this.dataType = dataType;
     }
 
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index b54dd44281..1ae34191ec 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -17,12 +17,12 @@
 
 package org.apache.inlong.sort.standalone.sink.pulsar;
 
+import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
-import org.apache.commons.lang.math.NumberUtils;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Transaction;
@@ -73,8 +73,8 @@ public class PulsarProducerCluster implements LifecycleAware {
             + "BatchingPartitionSwitchFrequency";
 
     private final String workerName;
-    private final CacheClusterConfig config;
     private final PulsarFederationSinkContext sinkContext;
+    private final PulsarNodeConfig nodeConfig;
     private final Context context;
     private final String cacheClusterName;
     private LifecycleState state;
@@ -88,20 +88,13 @@ public class PulsarProducerCluster implements 
LifecycleAware {
 
     private Map<String, Producer<byte[]>> producerMap = new 
ConcurrentHashMap<>();
 
-    /**
-     * Constructor
-     *
-     * @param workerName
-     * @param config
-     * @param context
-     */
-    public PulsarProducerCluster(String workerName, CacheClusterConfig config, 
PulsarFederationSinkContext context) {
+    public PulsarProducerCluster(String workerName, PulsarNodeConfig 
nodeConfig, PulsarFederationSinkContext context) {
         this.workerName = workerName;
-        this.config = config;
         this.sinkContext = context;
-        this.context = context.getProducerContext();
+        this.nodeConfig = nodeConfig;
+        this.context = new Context(nodeConfig.getProperties() != null ? 
nodeConfig.getProperties() : Maps.newHashMap());
         this.state = LifecycleState.IDLE;
-        this.cacheClusterName = config.getClusterName();
+        this.cacheClusterName = nodeConfig.getNodeName();
         this.handler = sinkContext.createEventHandler();
     }
 
@@ -114,20 +107,19 @@ public class PulsarProducerCluster implements 
LifecycleAware {
         try {
             // create pulsar client
             ClientBuilder clientBuilder = PulsarClient.builder();
-            String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
+            String serviceUrl = nodeConfig.getServiceUrl();
             if (StringUtils.isBlank(serviceUrl)) {
                 throw new IllegalArgumentException("service url should not be 
null");
             }
 
             clientBuilder.serviceUrl(serviceUrl);
-            String authentication = config.getParams().get(KEY_AUTHENTICATION);
+            String authentication = nodeConfig.getToken();
             if (StringUtils.isNoneBlank(authentication)) {
                 
clientBuilder.authentication(AuthenticationFactory.token(authentication));
             }
 
             this.client = clientBuilder
-                    
.statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS),
 -1),
-                            TimeUnit.SECONDS)
+                    .statsInterval(context.getLong(KEY_STATS_INTERVAL_SECONDS, 
-1L), TimeUnit.SECONDS)
                     .build();
 
             // create producer template
@@ -159,8 +151,12 @@ public class PulsarProducerCluster implements 
LifecycleAware {
      * @return CompressionType
      */
     private CompressionType getPulsarCompressionType() {
-        String type = this.context.getString(KEY_COMPRESSIONTYPE, 
DEFAULT_COMPRESS_TYPE);
-        switch (type) {
+        String type = nodeConfig.getCompressionType();
+        if (type == null) {
+            return CompressionType.ZLIB;
+        }
+
+        switch (type.toUpperCase()) {
             case "LZ4":
                 return CompressionType.LZ4;
             case "NONE":
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
index 709df5ffa6..fd4a0b95b5 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerFederation.java
@@ -17,22 +17,17 @@
 
 package org.apache.inlong.sort.standalone.sink.pulsar;
 
+import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.flume.Transaction;
 import org.slf4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * 
@@ -45,11 +40,9 @@ public class PulsarProducerFederation {
     private final String workerName;
     private final PulsarFederationSinkContext context;
     private Timer reloadTimer;
-
-    private List<PulsarProducerCluster> clusterList = new ArrayList<>();
-    private List<PulsarProducerCluster> deletingClusterList = new 
ArrayList<>();
-
-    private AtomicInteger clusterIndex = new AtomicInteger(0);
+    private PulsarNodeConfig nodeConfig;
+    private PulsarProducerCluster cluster;
+    private PulsarProducerCluster deleteCluster;
 
     /**
      * Constructor
@@ -83,9 +76,7 @@ public class PulsarProducerFederation {
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
         }
-        for (PulsarProducerCluster cluster : this.clusterList) {
-            cluster.stop();
-        }
+        cluster.stop();
     }
 
     /**
@@ -108,62 +99,30 @@ public class PulsarProducerFederation {
      */
     public void reload() {
         try {
-            // stop deleted cluster
-            deletingClusterList.forEach(item -> {
-                item.stop();
-            });
-            deletingClusterList.clear();
-            // update cluster list
-            List<CacheClusterConfig> configList = 
this.context.getCacheClusters();
-            List<PulsarProducerCluster> newClusterList = new 
ArrayList<>(configList.size());
-            // prepare
-            Set<String> newClusterNames = new HashSet<>();
-            configList.forEach(item -> {
-                newClusterNames.add(item.getClusterName());
-            });
-            Set<String> oldClusterNames = new HashSet<>();
-            clusterList.forEach(item -> {
-                oldClusterNames.add(item.getCacheClusterName());
-            });
-            // add
-            for (CacheClusterConfig config : configList) {
-                if (!oldClusterNames.contains(config.getClusterName())) {
-                    PulsarProducerCluster cluster = new 
PulsarProducerCluster(workerName, config, context);
-                    cluster.start();
-                    newClusterList.add(cluster);
-                }
+            if (deleteCluster != null) {
+                deleteCluster.stop();
+                deleteCluster = null;
             }
-            // remove
-            for (PulsarProducerCluster cluster : this.clusterList) {
-                if (newClusterNames.contains(cluster.getCacheClusterName())) {
-                    newClusterList.add(cluster);
-                } else {
-                    deletingClusterList.add(cluster);
-                }
+
+        } catch (Exception e) {
+            LOG.error("failed to close delete cluster, ex={}", e.getMessage(), 
e);
+        }
+
+        try {
+            if (nodeConfig != null && context.getNodeConfig().getVersion() <= 
nodeConfig.getVersion()) {
+                return;
             }
-            this.clusterList = newClusterList;
+            this.nodeConfig = context.getNodeConfig();
+            PulsarProducerCluster updateCluster = new 
PulsarProducerCluster(workerName, nodeConfig, context);
+            updateCluster.start();
+            this.deleteCluster = cluster;
+            this.cluster = updateCluster;
         } catch (Throwable e) {
             LOG.error(e.getMessage(), e);
         }
     }
 
-    /**
-     * send
-     * 
-     * @param  profileEvent
-     * @param  tx
-     * @return              boolean
-     * @throws IOException
-     */
     public boolean send(ProfileEvent profileEvent, Transaction tx) throws 
IOException {
-        int currentIndex = clusterIndex.getAndIncrement();
-        if (currentIndex > Integer.MAX_VALUE / 2) {
-            clusterIndex.set(0);
-        }
-        List<PulsarProducerCluster> currentClusterList = this.clusterList;
-        int currentSize = currentClusterList.size();
-        int realIndex = currentIndex % currentSize;
-        PulsarProducerCluster clusterProducer = 
currentClusterList.get(realIndex);
-        return clusterProducer.send(profileEvent, tx);
+        return cluster.send(profileEvent, tx);
     }
 }

Reply via email to