This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d0d16fad76 [INLONG-8598][Sort] Optimize sortstandalone pulsar sink
(#8616)
d0d16fad76 is described below
commit d0d16fad7619d15626c691740e3a0ba7275f3b07
Author: vernedeng <[email protected]>
AuthorDate: Tue Aug 1 15:32:36 2023 +0800
[INLONG-8598][Sort] Optimize sortstandalone pulsar sink (#8616)
---
.../sink/pulsar/PulsarFederationSink.java | 25 +++++-----
.../sink/pulsar/PulsarFederationSinkContext.java | 31 ++++++++----
.../standalone/sink/pulsar/PulsarIdConfig.java | 32 +++++++------
.../sink/pulsar/PulsarProducerCluster.java | 55 +++++++++++++---------
4 files changed, 83 insertions(+), 60 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
index 31338b9541..e5cf99aea2 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSink.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sort.standalone.sink.pulsar;
-import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.flume.Context;
@@ -27,12 +26,10 @@ import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
/**
- *
+ *
* PulsarFederationSink
*/
public class PulsarFederationSink extends AbstractSink implements Configurable
{
@@ -40,8 +37,8 @@ public class PulsarFederationSink extends AbstractSink
implements Configurable {
public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarFederationSink.class);
private PulsarFederationSinkContext context;
+ private Context parentContext;
private List<PulsarFederationWorker> workers = new ArrayList<>();
- private Map<String, String> dimensions;
/**
* start
@@ -49,6 +46,8 @@ public class PulsarFederationSink extends AbstractSink
implements Configurable {
@Override
public void start() {
String sinkName = this.getName();
+ this.context = new PulsarFederationSinkContext(sinkName,
parentContext, getChannel());
+ this.context.start();
// create worker
for (int i = 0; i < context.getMaxThreads(); i++) {
PulsarFederationWorker worker = new
PulsarFederationWorker(sinkName, i, context);
@@ -76,23 +75,21 @@ public class PulsarFederationSink extends AbstractSink
implements Configurable {
/**
* configure
- *
+ *
* @param context
*/
@Override
public void configure(Context context) {
- LOG.info("start to configure:{}, context:{}.",
this.getClass().getSimpleName(), context.toString());
- this.context = new PulsarFederationSinkContext(getName(), context,
getChannel());
- this.context.start();
- this.dimensions = new HashMap<>();
- this.dimensions.put(SortMetricItem.KEY_CLUSTER_ID,
this.context.getClusterId());
- this.dimensions.put(SortMetricItem.KEY_TASK_NAME,
this.context.getTaskName());
- this.dimensions.put(SortMetricItem.KEY_SINK_ID,
this.context.getSinkName());
+ LOG.info(
+ "start to configure:{}, context:{}.",
+ this.getClass().getSimpleName(),
+ context.toString());
+ this.parentContext = context;
}
/**
* process
- *
+ *
* @return Status
* @throws EventDeliveryException
*/
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index 031f7dc95c..d14c79cec3 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -41,7 +41,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
- *
+ *
* PulsarFederationSinkContext
*/
public class PulsarFederationSinkContext extends SinkContext {
@@ -68,20 +68,30 @@ public class PulsarFederationSinkContext extends
SinkContext {
* reload
*/
public void reload() {
- super.reload();
try {
SortTaskConfig newSortTaskConfig =
SortClusterConfigHolder.getTaskConfig(taskName);
+ if (newSortTaskConfig == null) {
+ LOG.error("newSortTaskConfig is null.");
+ return;
+ }
if (this.sortTaskConfig != null &&
this.sortTaskConfig.equals(newSortTaskConfig)) {
+ LOG.info("Same sortTaskConfig, do nothing.");
return;
}
this.sortTaskConfig = newSortTaskConfig;
this.producerContext = new
Context(this.sortTaskConfig.getSinkParams());
+
// parse the config of id and topic
+ LOG.info("reload idTopicMap");
Map<String, PulsarIdConfig> newIdConfigMap = new
ConcurrentHashMap<>();
List<Map<String, String>> idList =
this.sortTaskConfig.getIdParams();
for (Map<String, String> idParam : idList) {
- PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
- newIdConfigMap.put(idConfig.getUid(), idConfig);
+ try {
+ PulsarIdConfig idConfig = new PulsarIdConfig(idParam);
+ newIdConfigMap.put(idConfig.getUid(), idConfig);
+ } catch (Exception e) {
+ LOG.error("fail to parse pulsar id config", e);
+ }
}
// build cache cluster config
CacheClusterConfig clusterConfig = new CacheClusterConfig();
@@ -90,6 +100,7 @@ public class PulsarFederationSinkContext extends SinkContext
{
List<CacheClusterConfig> newClusterConfigList = new ArrayList<>();
newClusterConfigList.add(clusterConfig);
// change current config
+ LOG.info("old id config map={}\n new id config map={}",
idConfigMap, newIdConfigMap);
this.idConfigMap = newIdConfigMap;
this.clusterConfigList = newClusterConfigList;
} catch (Throwable e) {
@@ -99,7 +110,7 @@ public class PulsarFederationSinkContext extends SinkContext
{
/**
* get producerContext
- *
+ *
* @return the producerContext
*/
public Context getProducerContext() {
@@ -115,7 +126,7 @@ public class PulsarFederationSinkContext extends
SinkContext {
public String getTopic(String uid) {
PulsarIdConfig idConfig = this.idConfigMap.get(uid);
if (idConfig == null) {
- throw new NullPointerException("uid " + uid + "got null topic");
+ throw new NullPointerException("uid " + uid + " got null id
config");
}
return idConfig.getTopic();
}
@@ -136,7 +147,7 @@ public class PulsarFederationSinkContext extends
SinkContext {
/**
* getCacheClusters
- *
+ *
* @return
*/
public List<CacheClusterConfig> getCacheClusters() {
@@ -145,7 +156,7 @@ public class PulsarFederationSinkContext extends
SinkContext {
/**
* addSendMetric
- *
+ *
* @param currentRecord
* @param topic
*/
@@ -183,7 +194,7 @@ public class PulsarFederationSinkContext extends
SinkContext {
/**
* addSendResultMetric
- *
+ *
* @param currentRecord
* @param topic
* @param result
@@ -224,7 +235,7 @@ public class PulsarFederationSinkContext extends
SinkContext {
/**
* create IEvent2PulsarRecordHandler
- *
+ *
* @return IEvent2PulsarRecordHandler
*/
public IEvent2PulsarRecordHandler createEventHandler() {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index a4be082bd1..52377d2461 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -24,7 +24,7 @@ import org.apache.inlong.sort.standalone.utils.Constants;
import java.util.Map;
/**
- *
+ *
* KafkaIdConfig
*/
public class PulsarIdConfig {
@@ -33,6 +33,8 @@ public class PulsarIdConfig {
public static final String KEY_SEPARATOR = "separator";
public static final String DEFAULT_SEPARATOR = "|";
+ private static final String DEFAULT_INLONG_STREAM = "1";
+
private String inlongGroupId;
private String inlongStreamId;
private String uid;
@@ -49,12 +51,12 @@ public class PulsarIdConfig {
/**
* Constructor
- *
+ *
* @param idParam
*/
public PulsarIdConfig(Map<String, String> idParam) {
this.inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
- this.inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
+ this.inlongStreamId = idParam.getOrDefault(Constants.INLONG_STREAM_ID,
DEFAULT_INLONG_STREAM);
this.uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
this.separator = idParam.getOrDefault(PulsarIdConfig.KEY_SEPARATOR,
PulsarIdConfig.DEFAULT_SEPARATOR);
this.topic = idParam.getOrDefault(Constants.TOPIC, uid);
@@ -64,7 +66,7 @@ public class PulsarIdConfig {
/**
* get inlongGroupId
- *
+ *
* @return the inlongGroupId
*/
public String getInlongGroupId() {
@@ -73,7 +75,7 @@ public class PulsarIdConfig {
/**
* set inlongGroupId
- *
+ *
* @param inlongGroupId the inlongGroupId to set
*/
public void setInlongGroupId(String inlongGroupId) {
@@ -82,7 +84,7 @@ public class PulsarIdConfig {
/**
* get inlongStreamId
- *
+ *
* @return the inlongStreamId
*/
public String getInlongStreamId() {
@@ -91,7 +93,7 @@ public class PulsarIdConfig {
/**
* set inlongStreamId
- *
+ *
* @param inlongStreamId the inlongStreamId to set
*/
public void setInlongStreamId(String inlongStreamId) {
@@ -100,7 +102,7 @@ public class PulsarIdConfig {
/**
* get uid
- *
+ *
* @return the uid
*/
public String getUid() {
@@ -109,7 +111,7 @@ public class PulsarIdConfig {
/**
* set uid
- *
+ *
* @param uid the uid to set
*/
public void setUid(String uid) {
@@ -118,7 +120,7 @@ public class PulsarIdConfig {
/**
* get separator
- *
+ *
* @return the separator
*/
public String getSeparator() {
@@ -127,7 +129,7 @@ public class PulsarIdConfig {
/**
* set separator
- *
+ *
* @param separator the separator to set
*/
public void setSeparator(String separator) {
@@ -136,7 +138,7 @@ public class PulsarIdConfig {
/**
* get topic
- *
+ *
* @return the topic
*/
public String getTopic() {
@@ -145,7 +147,7 @@ public class PulsarIdConfig {
/**
* set topic
- *
+ *
* @param topic the topic to set
*/
public void setTopic(String topic) {
@@ -154,7 +156,7 @@ public class PulsarIdConfig {
/**
* get dataType
- *
+ *
* @return the dataType
*/
public DataType getDataType() {
@@ -163,7 +165,7 @@ public class PulsarIdConfig {
/**
* set dataType
- *
+ *
* @param dataType the dataType to set
*/
public void setDataType(DataType dataType) {
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
index 02bfbdc6b3..c5d655145c 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarProducerCluster.java
@@ -23,12 +23,14 @@ import org.apache.inlong.sort.standalone.utils.Constants;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.apache.commons.lang.math.NumberUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageId;
@@ -46,13 +48,14 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
- *
+ *
* PulsarProducerCluster
*/
public class PulsarProducerCluster implements LifecycleAware {
public static final Logger LOG =
InlongLoggerFactory.getLogger(PulsarProducerCluster.class);
+ private static final String DEFAULT_COMPRESS_TYPE = "SNAPPY";
public static final String KEY_SERVICE_URL = "serviceUrl";
public static final String KEY_AUTHENTICATION = "authentication";
public static final String KEY_STATS_INTERVAL_SECONDS =
"statsIntervalSeconds";
@@ -87,7 +90,7 @@ public class PulsarProducerCluster implements LifecycleAware {
/**
* Constructor
- *
+ *
* @param workerName
* @param config
* @param context
@@ -108,16 +111,26 @@ public class PulsarProducerCluster implements
LifecycleAware {
@Override
public void start() {
this.state = LifecycleState.START;
- // create pulsar client
try {
+ // create pulsar client
+ ClientBuilder clientBuilder = PulsarClient.builder();
String serviceUrl = config.getParams().get(KEY_SERVICE_URL);
+ if (StringUtils.isBlank(serviceUrl)) {
+ throw new IllegalArgumentException("service url should not be
null");
+ }
+
+ clientBuilder.serviceUrl(serviceUrl);
String authentication = config.getParams().get(KEY_AUTHENTICATION);
- this.client = PulsarClient.builder()
- .serviceUrl(serviceUrl)
-
.authentication(AuthenticationFactory.token(authentication))
+ if (StringUtils.isNoneBlank(authentication)) {
+
clientBuilder.authentication(AuthenticationFactory.token(authentication));
+ }
+
+ this.client = clientBuilder
.statsInterval(NumberUtils.toLong(config.getParams().get(KEY_STATS_INTERVAL_SECONDS),
-1),
TimeUnit.SECONDS)
.build();
+
+ // create producer template
this.baseBuilder = client.newProducer();
this.baseBuilder
.hashingScheme(HashingScheme.Murmur3_32Hash)
@@ -142,11 +155,11 @@ public class PulsarProducerCluster implements
LifecycleAware {
/**
* getPulsarCompressionType
- *
+ *
* @return CompressionType
*/
private CompressionType getPulsarCompressionType() {
- String type = this.context.getString(KEY_COMPRESSIONTYPE);
+ String type = this.context.getString(KEY_COMPRESSIONTYPE,
DEFAULT_COMPRESS_TYPE);
switch (type) {
case "LZ4":
return CompressionType.LZ4;
@@ -186,7 +199,7 @@ public class PulsarProducerCluster implements
LifecycleAware {
/**
* getLifecycleState
- *
+ *
* @return
*/
@Override
@@ -196,7 +209,7 @@ public class PulsarProducerCluster implements
LifecycleAware {
/**
* send
- *
+ *
* @param profileEvent
* @param tx
* @return boolean
@@ -210,15 +223,15 @@ public class PulsarProducerCluster implements
LifecycleAware {
Producer<byte[]> producer = this.producerMap.get(topic);
if (producer == null) {
try {
- LOG.info("try to new a object for topic " + topic);
+ LOG.debug("try to new a producer for topic " + topic);
producer = baseBuilder.clone().topic(topic)
.producerName(workerName + "-" + cacheClusterName +
"-" + topic)
.create();
- LOG.info("create new producer success:{}",
producer.getProducerName());
+ LOG.debug("create a new producer success:{}",
producer.getProducerName());
Producer<byte[]> oldProducer =
this.producerMap.putIfAbsent(topic, producer);
if (oldProducer != null) {
producer.close();
- LOG.info("close producer success:{}",
producer.getProducerName());
+ LOG.debug("close producer success:{}",
producer.getProducerName());
producer = oldProducer;
}
} catch (Throwable ex) {
@@ -230,12 +243,10 @@ public class PulsarProducerCluster implements
LifecycleAware {
tx.rollback();
tx.close();
sinkContext.addSendResultMetric(profileEvent, topic, false,
System.currentTimeMillis());
- return false;
- }
- String messageKey = headers.get(Constants.HEADER_KEY_MESSAGE_KEY);
- if (messageKey == null) {
- messageKey = headers.get(Constants.HEADER_KEY_SOURCE_IP);
+ LOG.error("failed to create producer, send failed");
+ throw new IllegalStateException();
}
+
// sendAsync
byte[] sendBytes = this.handler.parse(sinkContext, profileEvent);
// check
@@ -246,8 +257,10 @@ public class PulsarProducerCluster implements
LifecycleAware {
return true;
}
long sendTime = System.currentTimeMillis();
- CompletableFuture<MessageId> future =
producer.newMessage().key(messageKey).properties(headers)
- .value(sendBytes).sendAsync();
+ CompletableFuture<MessageId> future = producer.newMessage()
+ .properties(headers)
+ .value(sendBytes)
+ .sendAsync();
// callback
future.whenCompleteAsync((msgId, ex) -> {
if (ex != null) {
@@ -268,7 +281,7 @@ public class PulsarProducerCluster implements
LifecycleAware {
/**
* get cacheClusterName
- *
+ *
* @return the cacheClusterName
*/
public String getCacheClusterName() {