This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d0d16fad76 [INLONG-8598][Sort] Optimize sortstandalone pulsar sink 
(#8616)
d0d16fad76 is described below

commit d0d16fad7619d15626c691740e3a0ba7275f3b07
Author: vernedeng <[email protected]>
AuthorDate: Tue Aug 1 15:32:36 2023 +0800

    [INLONG-8598][Sort] Optimize sortstandalone pulsar sink (#8616)
---
 .../sink/pulsar/PulsarFederationSink.java          | 25 +++++-----
 .../sink/pulsar/PulsarFederationSinkContext.java   | 31 ++++++++----
 .../standalone/sink/pulsar/PulsarIdConfig.java     | 32 +++++++------
 .../sink/pulsar/PulsarProducerCluster.java         | 55 +++++++++++++---------
 4 files changed, 83 insertions(+), 60 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
index 31338b9541..e5cf99aea2 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
@@ -17,7 +17,6 @@
 
 package org.apache.inlong.sort.standalone.sink.pulsar;
 
-import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.flume.Context;
@@ -27,12 +26,10 @@ import org.apache.flume.sink.AbstractSink;
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
- * 
+ *
  * PulsarFederationSink
  */
 public class PulsarFederationSink extends AbstractSink implements Configurable 
{
@@ -40,8 +37,8 @@ public class PulsarFederationSink extends AbstractSink 
implements Configurable {
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarFederationSink.class);
 
     private PulsarFederationSinkContext context;
+    private Context parentContext;
     private List<PulsarFederationWorker> workers = new ArrayList<>();
-    private Map<String, String> dimensions;
 
     /**
      * start
@@ -49,6 +46,8 @@ public class PulsarFederationSink extends AbstractSink 
implements Configurable {
     @Override
     public void start() {
         String sinkName = this.getName();
+        this.context = new PulsarFederationSinkContext(sinkName, 
parentContext, getChannel());
+        this.context.start();
         // create worker
         for (int i = 0; i < context.getMaxThreads(); i++) {
             PulsarFederationWorker worker = new 
PulsarFederationWorker(sinkName, i, context);
@@ -76,23 +75,21 @@ public class PulsarFederationSink extends AbstractSink 
implements Configurable {
 
     /**
      * configure
-     * 
+     *
      * @param context
      */
     @Override
     public void configure(Context context) {
-        LOG.info("start to configure:{}, context:{}.", 
this.getClass().getSimpleName(), context.toString());
-        this.context = new PulsarFederationSinkContext(getName(), context, 
getChannel());
-        this.context.start();
-        this.dimensions = new HashMap<>();
-        this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID, 
this.context.getClusterId());
-        this.dimensions.put(SortMetricItem.KEY_TASK_NAME, 
this.context.getTaskName());
-        this.dimensions.put(SortMetricItem.KEY_SINK_ID, 
this.context.getSinkName());
+        LOG.info(
+                "start to configure:{}, context:{}.",
+                this.getClass().getSimpleName(),
+                context.toString());
+        this.parentContext = context;
     }
 
     /**
      * process
-     * 
+     *
      * @return                        Status
      * @throws EventDeliveryException
      */
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 031f7dc95c..d14c79cec3 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -41,7 +41,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * 
+ *
  * PulsarFederationSinkContext
  */
 public class PulsarFederationSinkContext extends SinkContext {
@@ -68,20 +68,30 @@ public class PulsarFederationSinkContext extends 
SinkContext {
      * reload
      */
     public void reload() {
-        super.reload();
         try {
             SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            if (newSortTaskConfig == null) {
+                LOG.error("newSortTaskConfig is null.");
+                return;
+            }
             if (this.sortTaskConfig != null && 
this.sortTaskConfig.equals(newSortTaskConfig)) {
+                LOG.info("Same sortTaskConfig, do nothing.");
                 return;
             }
             this.sortTaskConfig = newSortTaskConfig;
             this.producerContext = new 
Context(this.sortTaskConfig.getSinkParams());
+
             // parse the config of id and topic
+            LOG.info("reload idTopicMap");
             Map<String, PulsarIdConfig> newIdConfigMap = new 
ConcurrentHashMap<>();
             List<Map<String, String>> idList = 
this.sortTaskConfig.getIdParams();
             for (Map<String, String> idParam : idList) {
-                PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
-                newIdConfigMap.put(idConfig.getUid(), idConfig);
+                try {
+                    PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
+                    newIdConfigMap.put(idConfig.getUid(), idConfig);
+                } catch (Exception e) {
+                    LOG.error("fail to parse pulsar id config", e);
+                }
             }
             // build cache cluster config
             CacheClusterConfig clusterConfig = new CacheClusterConfig();
@@ -90,6 +100,7 @@ public class PulsarFederationSinkContext extends SinkContext 
{
             List<CacheClusterConfig> newClusterConfigList = new ArrayList<>();
             newClusterConfigList.add(clusterConfig);
             // change current config
+            LOG.info("old id config map={}\n new id config map={}", 
idConfigMap, newIdConfigMap);
             this.idConfigMap = newIdConfigMap;
             this.clusterConfigList = newClusterConfigList;
         } catch (Throwable e) {
@@ -99,7 +110,7 @@ public class PulsarFederationSinkContext extends SinkContext 
{
 
     /**
      * get producerContext
-     * 
+     *
      * @return the producerContext
      */
     public Context getProducerContext() {
@@ -115,7 +126,7 @@ public class PulsarFederationSinkContext extends 
SinkContext {
     public String getTopic(String uid) {
         PulsarIdConfig idConfig = this.idConfigMap.get(uid);
         if (idConfig == null) {
-            throw new NullPointerException("uid " + uid + "got null topic");
+            throw new NullPointerException("uid " + uid + " got null id 
config");
         }
         return idConfig.getTopic();
     }
@@ -136,7 +147,7 @@ public class PulsarFederationSinkContext extends 
SinkContext {
 
     /**
      * getCacheClusters
-     * 
+     *
      * @return
      */
     public List<CacheClusterConfig> getCacheClusters() {
@@ -145,7 +156,7 @@ public class PulsarFederationSinkContext extends 
SinkContext {
 
     /**
      * addSendMetric
-     * 
+     *
      * @param currentRecord
      * @param topic
      */
@@ -183,7 +194,7 @@ public class PulsarFederationSinkContext extends 
SinkContext {
 
     /**
      * addSendResultMetric
-     * 
+     *
      * @param currentRecord
      * @param topic
      * @param result
@@ -224,7 +235,7 @@ public class PulsarFederationSinkContext extends 
SinkContext {
 
     /**
      * create IEvent2PulsarRecordHandler
-     * 
+     *
      * @return IEvent2PulsarRecordHandler
      */
     public IEvent2PulsarRecordHandler createEventHandler() {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index a4be082bd1..52377d2461 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -24,7 +24,7 @@ import org.apache.inlong.sort.standalone.utils.Constants;
 import java.util.Map;
 
 /**
- * 
+ *
  * KafkaIdConfig
  */
 public class PulsarIdConfig {
@@ -33,6 +33,8 @@ public class PulsarIdConfig {
     public static final String KEY_SEPARATOR = "separator";
     public static final String DEFAULT_SEPARATOR = "|";
 
+    private static final String DEFAULT_INLONG_STREAM = "1";
+
     private String inlongGroupId;
     private String inlongStreamId;
     private String uid;
@@ -49,12 +51,12 @@ public class PulsarIdConfig {
 
     /**
      * Constructor
-     * 
+     *
      * @param idParam
      */
     public PulsarIdConfig(Map<String, String> idParam) {
         this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
-        this.inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
+        this.inlongStreamId = idParam.getOrDefault(Constants.INLONG_STREAM_ID, 
DEFAULT_INLONG_STREAM);
         this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
         this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR, 
PulsarIdConfig.DEFAULT_SEPARATOR);
         this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
@@ -64,7 +66,7 @@ public class PulsarIdConfig {
 
     /**
      * get inlongGroupId
-     * 
+     *
      * @return the inlongGroupId
      */
     public String getInlongGroupId() {
@@ -73,7 +75,7 @@ public class PulsarIdConfig {
 
     /**
      * set inlongGroupId
-     * 
+     *
      * @param inlongGroupId the inlongGroupId to set
      */
     public void setInlongGroupId(String inlongGroupId) {
@@ -82,7 +84,7 @@ public class PulsarIdConfig {
 
     /**
      * get inlongStreamId
-     * 
+     *
      * @return the inlongStreamId
      */
     public String getInlongStreamId() {
@@ -91,7 +93,7 @@ public class PulsarIdConfig {
 
     /**
      * set inlongStreamId
-     * 
+     *
      * @param inlongStreamId the inlongStreamId to set
      */
     public void setInlongStreamId(String inlongStreamId) {
@@ -100,7 +102,7 @@ public class PulsarIdConfig {
 
     /**
      * get uid
-     * 
+     *
      * @return the uid
      */
     public String getUid() {
@@ -109,7 +111,7 @@ public class PulsarIdConfig {
 
     /**
      * set uid
-     * 
+     *
      * @param uid the uid to set
      */
     public void setUid(String uid) {
@@ -118,7 +120,7 @@ public class PulsarIdConfig {
 
     /**
      * get separator
-     * 
+     *
      * @return the separator
      */
     public String getSeparator() {
@@ -127,7 +129,7 @@ public class PulsarIdConfig {
 
     /**
      * set separator
-     * 
+     *
      * @param separator the separator to set
      */
     public void setSeparator(String separator) {
@@ -136,7 +138,7 @@ public class PulsarIdConfig {
 
     /**
      * get topic
-     * 
+     *
      * @return the topic
      */
     public String getTopic() {
@@ -145,7 +147,7 @@ public class PulsarIdConfig {
 
     /**
      * set topic
-     * 
+     *
      * @param topic the topic to set
      */
     public void setTopic(String topic) {
@@ -154,7 +156,7 @@ public class PulsarIdConfig {
 
     /**
      * get dataType
-     * 
+     *
      * @return the dataType
      */
     public DataType getDataType() {
@@ -163,7 +165,7 @@ public class PulsarIdConfig {
 
     /**
      * set dataType
-     * 
+     *
      * @param dataType the dataType to set
      */
     public void setDataType(DataType dataType) {
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 02bfbdc6b3..c5d655145c 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -23,12 +23,14 @@ import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.flume.Transaction;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.HashingScheme;
 import org.apache.pulsar.client.api.MessageId;
@@ -46,13 +48,14 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 /**
- * 
+ *
  * PulsarProducerCluster
  */
 public class PulsarProducerCluster implements LifecycleAware {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(PulsarProducerCluster.class);
 
+    private static final String DEFAULT_COMPRESS_TYPE = "SNAPPY";
     public static final String KEY_SERVICE_URL = "serviceUrl";
     public static final String KEY_AUTHENTICATION = "authentication";
     public static final String KEY_STATS_INTERVAL_SECONDS = 
"statsIntervalSeconds";
@@ -87,7 +90,7 @@ public class PulsarProducerCluster implements LifecycleAware {
 
     /**
      * Constructor
-     * 
+     *
      * @param workerName
      * @param config
      * @param context
@@ -108,16 +111,26 @@ public class PulsarProducerCluster implements 
LifecycleAware {
     @Override
     public void start() {
         this.state = LifecycleState.START;
-        // create pulsar client
         try {
+            // create pulsar client
+            ClientBuilder clientBuilder = PulsarClient.builder();
             String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
+            if (StringUtils.isBlank(serviceUrl)) {
+                throw new IllegalArgumentException("service url should not be 
null");
+            }
+
+            clientBuilder.serviceUrl(serviceUrl);
             String authentication = config.getParams().get(KEY_AUTHENTICATION);
-            this.client = PulsarClient.builder()
-                    .serviceUrl(serviceUrl)
-                    
.authentication(AuthenticationFactory.token(authentication))
+            if (StringUtils.isNoneBlank(authentication)) {
+                
clientBuilder.authentication(AuthenticationFactory.token(authentication));
+            }
+
+            this.client = clientBuilder
                     
.statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS),
 -1),
                             TimeUnit.SECONDS)
                     .build();
+
+            // create producer template
             this.baseBuilder = client.newProducer();
             this.baseBuilder
                     .hashingScheme(HashingScheme.Murmur3_32Hash)
@@ -142,11 +155,11 @@ public class PulsarProducerCluster implements 
LifecycleAware {
 
     /**
      * getPulsarCompressionType
-     * 
+     *
      * @return CompressionType
      */
     private CompressionType getPulsarCompressionType() {
-        String type = this.context.getString(KEY_COMPRESSIONTYPE);
+        String type = this.context.getString(KEY_COMPRESSIONTYPE, 
DEFAULT_COMPRESS_TYPE);
         switch (type) {
             case "LZ4":
                 return CompressionType.LZ4;
@@ -186,7 +199,7 @@ public class PulsarProducerCluster implements 
LifecycleAware {
 
     /**
      * getLifecycleState
-     * 
+     *
      * @return
      */
     @Override
@@ -196,7 +209,7 @@ public class PulsarProducerCluster implements 
LifecycleAware {
 
     /**
      * send
-     * 
+     *
      * @param  profileEvent
      * @param  tx
      * @return              boolean
@@ -210,15 +223,15 @@ public class PulsarProducerCluster implements 
LifecycleAware {
         Producer<byte[]> producer = this.producerMap.get(topic);
         if (producer == null) {
             try {
-                LOG.info("try to new a object for topic " + topic);
+                LOG.debug("try to new a producer for topic " + topic);
                 producer = baseBuilder.clone().topic(topic)
                         .producerName(workerName + "-" + cacheClusterName + 
"-" + topic)
                         .create();
-                LOG.info("create new producer success:{}", 
producer.getProducerName());
+                LOG.debug("create a new producer success:{}", 
producer.getProducerName());
                 Producer<byte[]> oldProducer = 
this.producerMap.putIfAbsent(topic, producer);
                 if (oldProducer != null) {
                     producer.close();
-                    LOG.info("close producer success:{}", 
producer.getProducerName());
+                    LOG.debug("close producer success:{}", 
producer.getProducerName());
                     producer = oldProducer;
                 }
             } catch (Throwable ex) {
@@ -230,12 +243,10 @@ public class PulsarProducerCluster implements 
LifecycleAware {
             tx.rollback();
             tx.close();
             sinkContext.addSendResultMetric(profileEvent, topic, false, 
System.currentTimeMillis());
-            return false;
-        }
-        String messageKey = headers.get(Constants.HEADER_KEY_MESSAGE_KEY);
-        if (messageKey == null) {
-            messageKey = headers.get(Constants.HEADER_KEY_SOURCE_IP);
+            LOG.error("failed to create producer, send failed");
+            throw new IllegalStateException();
         }
+
         // sendAsync
         byte[] sendBytes = this.handler.parse(sinkContext, profileEvent);
         // check
@@ -246,8 +257,10 @@ public class PulsarProducerCluster implements 
LifecycleAware {
             return true;
         }
         long sendTime = System.currentTimeMillis();
-        CompletableFuture<MessageId> future = 
producer.newMessage().key(messageKey).properties(headers)
-                .value(sendBytes).sendAsync();
+        CompletableFuture<MessageId> future = producer.newMessage()
+                .properties(headers)
+                .value(sendBytes)
+                .sendAsync();
         // callback
         future.whenCompleteAsync((msgId, ex) -> {
             if (ex != null) {
@@ -268,7 +281,7 @@ public class PulsarProducerCluster implements 
LifecycleAware {
 
     /**
      * get cacheClusterName
-     * 
+     *
      * @return the cacheClusterName
      */
     public String getCacheClusterName() {

Reply via email to