This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 7552b05f9 [INLONG-6704][Agent] Add PulsarSink (#6709)
7552b05f9 is described below
commit 7552b05f94806ca3e251d2cfccc43e977ceedf20
Author: xueyingzhang <[email protected]>
AuthorDate: Tue Dec 6 11:22:54 2022 +0800
[INLONG-6704][Agent] Add PulsarSink (#6709)
---
.../inlong/agent/constant/AgentConstants.java | 48 +++
.../inlong/agent/message/BatchProxyMessage.java | 20 +
.../apache/inlong/agent/pojo/JobProfileDto.java | 2 +-
.../inlong/agent/conf/TestConfiguration.java | 2 +-
.../agent/plugin/message/PackProxyMessage.java | 9 +
.../inlong/agent/plugin/sinks/AbstractSink.java | 20 +
.../inlong/agent/plugin/sinks/ProxySink.java | 18 -
.../inlong/agent/plugin/sinks/PulsarSink.java | 425 +++++++++++++++++++++
.../inlong/agent/plugin/sinks/SenderManager.java | 14 +-
.../inlong/agent/plugin/sinks/PulsarSinkTest.java | 68 ++++
.../src/test/resources/pulsarSinkJob.json | 15 +
.../apache/inlong/common/enums/TaskTypeEnum.java | 2 +-
.../inlong/common/msg/AttributeConstants.java | 2 +
.../apache/inlong/common/util/MessageUtils.java | 36 +-
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 13 +-
.../inlong/sdk/dataproxy/utils/ProxyUtils.java | 16 -
16 files changed, 641 insertions(+), 69 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index c332d154c..db2d5adeb 100755
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.constant;
+import io.netty.util.NettyRuntime;
+import io.netty.util.internal.SystemPropertyUtil;
import org.apache.inlong.agent.utils.AgentUtils;
/**
@@ -130,4 +132,50 @@ public class AgentConstants {
public static final String AGENT_METRIC_LISTENER_CLASS =
"agent.domainListeners";
public static final String AGENT_METRIC_LISTENER_CLASS_DEFAULT =
"org.apache.inlong.agent.metrics.AgentPrometheusMetricListener";
+
+ // pulsar sink config
+ public static final String PULSAR_CLIENT_IO_TREHAD_NUM =
"agent.sink.pulsar.client.io.thread.num";
+ public static final int DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM = Math.max(1,
+ SystemPropertyUtil.getInt("io.netty.eventLoopThreads",
NettyRuntime.availableProcessors() * 2));
+
+ public static final String PULSAR_CONNECTION_PRE_BROKER =
"agent.sink.pulsar.connection.pre.broker";
+ public static final int DEFAULT_PULSAR_CONNECTION_PRE_BROKER = 1;
+
+ public static final String PULSAR_CLIENT_TIMEOUT_SECOND =
"agent.sink.pulsar.send.timeout.second";
+ public static final int DEFAULT_PULSAR_CLIENT_TIMEOUT_SECOND = 30;
+
+ public static final String PULSAR_CLIENT_ENABLE_BATCH =
"agent.sink.pullsar.enable.batch";
+ public static final boolean DEFAULT_PULSAR_CLIENT_ENABLE_BATCH = true;
+
+ public static final String PULSAR_CLIENT_BLOCK_IF_QUEUE_FULL =
"agent.sink.pulsar.block.if.queue.full";
+ public static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
+
+ public static final String PULSAR_CLIENT_MAX_PENDING_MESSAGES =
"agent.sink.pulsar.max.pending.messages";
+ public static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
+
+ public static final String
PULSAR_CLIENT_MAX_PENDING_MESSAGES_ACROSS_PARTITION =
+ "agent.sink.pulsar.max.messages.across.partition";
+ public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITION =
500000;
+
+ public static final String PULSAR_CLIENT_MAX_BATCH_BYTES =
"agent.sink.pulsar.max.batch.bytes";
+ public static final int DEFAULT_MAX_BATCH_BYTES = 128 * 1024;
+
+ public static final String PULSAR_CLIENT_MAX_BATCH_MESSAGES =
"agent.sink.pulsar.max.batch.messages";
+ public static final int DEFAULT_MAX_BATCH_MESSAGES = 1000;
+
+ public static final String PULSAR_CLIENT_MAX_BATCH_INTERVAL_MILLIS =
"agent.sink.pulsar.max.batch.interval.millis";
+ public static final int DEFAULT_MAX_BATCH_INTERVAL_MILLIS = 1;
+
+ public static final String PULSAR_CLIENT_COMPRESSION_TYPE =
"agent.sink.pulsar.compression.type";
+ public static final String DEFAULT_COMPRESSION_TYPE = "NONE";
+
+ public static final String PULSAR_CLIENT_PRODUCER_NUM =
"agent.sink.pulsar.producer.num";
+ public static final int DEFAULT_PRODUCER_NUM = 3;
+
+ public static final String PULSAR_CLIENT_ENABLE_ASYNC_SEND =
"agent.sink.pulsar.enbale.async.send";
+ public static final boolean DEFAULT_ENABLE_ASYNC_SEND = true;
+
+ public static final String PULSAR_SINK_SEND_QUEUE_SIZE =
"agent.sink.pulsar.send.queue.size";
+ public static final int DEFAULT_SEND_QUEUE_SIZE = 20000;
+
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
index 005084250..39be15437 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
@@ -20,6 +20,9 @@ package org.apache.inlong.agent.message;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.common.util.MessageUtils;
import java.util.List;
import java.util.Map;
@@ -39,4 +42,21 @@ public class BatchProxyMessage {
private long dataTime;
private Map<String, String> extraMap;
private boolean isSyncSend;
+
+ public InLongMsg getInLongMsg() {
+ InLongMsg message = InLongMsg.newInLongMsg(true);
+ String attr = MessageUtils.convertAttrToStr(extraMap).toString();
+ for (byte[] lineData : dataList) {
+ message.addMsg(attr, lineData);
+ }
+ return message;
+ }
+
+ public int getMsgCnt() {
+ return CollectionUtils.isEmpty(dataList) ? 0 : dataList.size();
+ }
+
+ public long getTotalSize() {
+ return CollectionUtils.isEmpty(dataList) ? 0 :
dataList.stream().mapToLong(body -> body.length).sum();
+ }
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 77e1fa855..5987c47e3 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -417,7 +417,7 @@ public class JobProfileDto {
break;
case MOCK:
profileDto.setJob(job);
-
+ break;
default:
}
return TriggerProfile.parseJsonStr(GSON.toJson(profileDto));
diff --git
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
index 4019c896b..dcc651135 100755
---
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
+++
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
@@ -87,7 +87,7 @@ public class TestConfiguration {
@Test
public void testJobSinkConf() {
DataConfig dataConfig = new DataConfig();
- dataConfig.setTaskType(101);
+ dataConfig.setTaskType(201);
dataConfig.setDataReportType(1);
JobProfile profile = JobProfileDto.convertToTriggerProfile(dataConfig);
assertEquals(profile.get(JOB_SINK), DEFAULT_DATAPROXY_SINK);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
index 045dcd00b..d51ba2ae0 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
@@ -39,6 +39,9 @@ import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STRE
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
+import static org.apache.inlong.common.msg.AttributeConstants.DATA_TIME;
+import static org.apache.inlong.common.msg.AttributeConstants.MESSAGE_TOPIC;
+import static org.apache.inlong.common.msg.AttributeConstants.STREAM_ID;
/**
* Handle List of BusMessage, which belong to the same stream id.
@@ -87,6 +90,12 @@ public class PackProxyMessage {
this.extraMap.put(AttributeConstants.MESSAGE_PARTITION_KEY, dataKey);
}
+ public void addTopicAndDataTime(String topic, long dataTime) {
+ this.extraMap.put(STREAM_ID, streamId);
+ this.extraMap.put(MESSAGE_TOPIC, topic);
+ this.extraMap.put(DATA_TIME, String.valueOf(dataTime));
+ }
+
/**
* Check whether queue is nearly full
*
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
index 0928d2c3d..8b6858f78 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
@@ -22,19 +22,24 @@ import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.Sink;
+import org.apache.inlong.agent.plugin.message.PackProxyMessage;
import org.apache.inlong.common.metric.MetricRegister;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME;
+import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
import static
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -47,12 +52,19 @@ public abstract class AbstractSink implements Sink {
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractSink.class);
protected String inlongGroupId;
protected String inlongStreamId;
+
// metric
protected AgentMetricItemSet metricItemSet;
protected AgentMetricItem sinkMetric;
protected Map<String, String> dimensions;
protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+
protected JobProfile jobConf;
+ protected String sourceName;
+ protected String jobInstanceId;
+ protected int batchFlushInterval;
+ // key is stream id, value is a batch of messages belong to the same
stream id
+ protected ConcurrentHashMap<String, PackProxyMessage> cache;
@Override
public MessageFilter initMessageFilter(JobProfile jobConf) {
@@ -67,11 +79,19 @@ public abstract class AbstractSink implements Sink {
return null;
}
+ @Override
+ public void setSourceName(String sourceFileName) {
+ this.sourceName = sourceFileName;
+ }
+
@Override
public void init(JobProfile jobConf) {
this.jobConf = jobConf;
+ jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID,
DEFAULT_PROXY_INLONG_GROUP_ID);
inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID,
DEFAULT_PROXY_INLONG_STREAM_ID);
+ cache = new ConcurrentHashMap<>(10);
+ batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
this.dimensions = new HashMap<>();
dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index dfe4b7a33..f8eff052d 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -32,7 +32,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -40,9 +39,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
-import static
org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-import static
org.apache.inlong.agent.constant.JobConstants.PROXY_BATCH_FLUSH_INTERVAL;
/**
* sink message data to inlong-dataproxy
@@ -57,12 +53,7 @@ public class ProxySink extends AbstractSink {
private MessageFilter messageFilter;
private SenderManager senderManager;
private byte[] fieldSplitter;
- private String sourceName;
- private String jobInstanceId;
- private int batchFlushInterval;
private volatile boolean shutdown = false;
- // key is stream id, value is a batch of messages belong to the same
stream id
- private ConcurrentHashMap<String, PackProxyMessage> cache;
public ProxySink() {
}
@@ -115,11 +106,6 @@ public class ProxySink extends AbstractSink {
}
}
- @Override
- public void setSourceName(String sourceFileName) {
- this.sourceName = sourceFileName;
- }
-
/**
* flush cache by batch
*
@@ -154,10 +140,6 @@ public class ProxySink extends AbstractSink {
@Override
public void init(JobProfile jobConf) {
super.init(jobConf);
- jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
- batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
- DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
- cache = new ConcurrentHashMap<>(10);
messageFilter = initMessageFilter(jobConf);
fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER,
DEFAULT_FIELD_SPLITTER).getBytes(
StandardCharsets.UTF_8);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
new file mode 100644
index 000000000..43b88e423
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.message.BatchProxyMessage;
+import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.message.PackProxyMessage;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_BLOCK_IF_QUEUE_FULL;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_COMPRESSION_TYPE;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_ENABLE_ASYNC_SEND;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_BATCH_BYTES;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_BATCH_INTERVAL_MILLIS;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_BATCH_MESSAGES;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_PENDING_MESSAGES;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITION;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PRODUCER_NUM;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PULSAR_CLIENT_ENABLE_BATCH;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PULSAR_CLIENT_TIMEOUT_SECOND;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PULSAR_CONNECTION_PRE_BROKER;
+import static
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_SEND_QUEUE_SIZE;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_BLOCK_IF_QUEUE_FULL;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_COMPRESSION_TYPE;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_ENABLE_ASYNC_SEND;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_ENABLE_BATCH;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_IO_TREHAD_NUM;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_BATCH_BYTES;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_BATCH_INTERVAL_MILLIS;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_BATCH_MESSAGES;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_PENDING_MESSAGES;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_PENDING_MESSAGES_ACROSS_PARTITION;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_PRODUCER_NUM;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_TIMEOUT_SECOND;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CONNECTION_PRE_BROKER;
+import static
org.apache.inlong.agent.constant.AgentConstants.PULSAR_SINK_SEND_QUEUE_SIZE;
+
+public class PulsarSink extends AbstractSink {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerFactory.class);
+ private static final AtomicInteger CLIENT_INDEX = new AtomicInteger(0);
+ private static final ExecutorService EXECUTOR_SERVICE = new
ThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new
AgentThreadFactory("PulsarSink"));
+ private final AgentConfiguration agentConf =
AgentConfiguration.getAgentConf();
+ private TaskPositionManager taskPositionManager;
+ private volatile boolean shutdown = false;
+ private List<MQClusterInfo> mqClusterInfos;
+ private String topic;
+ private List<PulsarTopicSender> pulsarSenders;
+ private int clientIoThreads;
+
+ private int sendQueueSize;
+ private Semaphore sendQueueSemaphore; // limit the count of
batchProxyMessage waiting to be sent
+ private LinkedBlockingQueue<BatchProxyMessage> pulsarSendQueue;
+
+ // pulsar client parameters
+ private int connectionsPreBroker;
+ private boolean enableBatch;
+ private boolean blockIfQueueFull;
+ private int maxPendingMessages;
+ private int maxPendingMessagesAcrossPartitions;
+ private CompressionType compressionType;
+ private int maxBatchingBytes;
+ private int maxBatchingMessages;
+ private long maxBatchingPublishDelayMillis;
+ private int sendTimeoutSecond;
+ private int producerNum;
+ private boolean asyncSend;
+
+ @Override
+ public void init(JobProfile jobConf) {
+ super.init(jobConf);
+ taskPositionManager = TaskPositionManager.getTaskPositionManager();
+ // agentConf
+ sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE,
DEFAULT_SEND_QUEUE_SIZE);
+ sendQueueSemaphore = new Semaphore(sendQueueSize);
+ pulsarSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
+ clientIoThreads = agentConf.getInt(PULSAR_CLIENT_IO_TREHAD_NUM,
DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM);
+ connectionsPreBroker = agentConf.getInt(PULSAR_CONNECTION_PRE_BROKER,
DEFAULT_PULSAR_CONNECTION_PRE_BROKER);
+ sendTimeoutSecond = agentConf.getInt(PULSAR_CLIENT_TIMEOUT_SECOND,
DEFAULT_PULSAR_CLIENT_TIMEOUT_SECOND);
+ enableBatch = agentConf.getBoolean(PULSAR_CLIENT_ENABLE_BATCH,
DEFAULT_PULSAR_CLIENT_ENABLE_BATCH);
+ blockIfQueueFull =
agentConf.getBoolean(PULSAR_CLIENT_BLOCK_IF_QUEUE_FULL,
DEFAULT_BLOCK_IF_QUEUE_FULL);
+ maxPendingMessages =
agentConf.getInt(PULSAR_CLIENT_MAX_PENDING_MESSAGES,
DEFAULT_MAX_PENDING_MESSAGES);
+ maxBatchingBytes = agentConf.getInt(PULSAR_CLIENT_MAX_BATCH_BYTES,
DEFAULT_MAX_BATCH_BYTES);
+ maxBatchingMessages =
agentConf.getInt(PULSAR_CLIENT_MAX_BATCH_MESSAGES, DEFAULT_MAX_BATCH_MESSAGES);
+ maxBatchingPublishDelayMillis =
agentConf.getInt(PULSAR_CLIENT_MAX_BATCH_INTERVAL_MILLIS,
+ DEFAULT_MAX_BATCH_INTERVAL_MILLIS);
+ maxPendingMessagesAcrossPartitions =
agentConf.getInt(PULSAR_CLIENT_MAX_PENDING_MESSAGES_ACROSS_PARTITION,
+ DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITION);
+ producerNum = agentConf.getInt(PULSAR_CLIENT_PRODUCER_NUM,
DEFAULT_PRODUCER_NUM);
+ asyncSend = agentConf.getBoolean(PULSAR_CLIENT_ENABLE_ASYNC_SEND,
DEFAULT_ENABLE_ASYNC_SEND);
+ String compresstion = agentConf.get(PULSAR_CLIENT_COMPRESSION_TYPE,
DEFAULT_COMPRESSION_TYPE);
+ if (StringUtils.isNotEmpty(compresstion)) {
+ compressionType = CompressionType.valueOf(compresstion);
+ } else {
+ compressionType = CompressionType.NONE;
+ }
+ // jobConf
+ mqClusterInfos = jobConf.getMqClusters();
+
Preconditions.checkArgument(ObjectUtils.isNotEmpty(jobConf.getMqTopic()) &&
jobConf.getMqTopic().isValid(),
+ "no valid pulsar topic config");
+ topic = jobConf.getMqTopic().getTopic();
+ pulsarSenders = new ArrayList<>();
+ initPulsarSender();
+ EXECUTOR_SERVICE.execute(sendDataThread());
+ EXECUTOR_SERVICE.execute(flushCache());
+ }
+
+ @Override
+ public void write(Message message) {
+ try {
+ if (message != null) {
+ if (!(message instanceof EndMessage)) {
+ ProxyMessage proxyMessage = ProxyMessage.parse(message);
+ // add proxy message to cache.
+ cache.compute(proxyMessage.getBatchKey(),
+ (s, packProxyMessage) -> {
+ if (packProxyMessage == null) {
+ packProxyMessage =
+ new
PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId);
+
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
+
packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis());
+ }
+ // add message to package proxy
+ packProxyMessage.addProxyMessage(proxyMessage);
+ return packProxyMessage;
+ });
+ // increment the count of successful sinks
+ sinkMetric.sinkSuccessCount.incrementAndGet();
+ } else {
+ // increment the count of failed sinks
+ sinkMetric.sinkFailCount.incrementAndGet();
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("write message to Proxy sink error", e);
+ } catch (Throwable t) {
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
+ }
+
+ }
+
+ @Override
+ public void destroy() {
+ LOGGER.info("destroy pulsar sink, job[{}], source[{}]", jobInstanceId,
sourceName);
+ while (!sinkFinish()) {
+ LOGGER.info("job {} wait until cache all data to pulsar",
jobInstanceId);
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
+ }
+ shutdown = true;
+ EXECUTOR_SERVICE.shutdown();
+ if (CollectionUtils.isNotEmpty(pulsarSenders)) {
+ for (PulsarTopicSender sender : pulsarSenders) {
+ sender.close();
+ }
+ pulsarSenders.clear();
+ }
+ }
+
+ private boolean sinkFinish() {
+ return cache.values().stream().allMatch(PackProxyMessage::isEmpty) &&
pulsarSendQueue.isEmpty();
+ }
+
+ /**
+ * flush cache by batch
+ *
+ * @return thread runner
+ */
+ private Runnable flushCache() {
+ return () -> {
+ LOGGER.info("start flush cache thread for {} ProxySink",
inlongGroupId);
+ while (!shutdown) {
+ try {
+ cache.forEach((batchKey, packProxyMessage) -> {
+ BatchProxyMessage batchProxyMessage =
packProxyMessage.fetchBatch();
+ if (batchProxyMessage != null) {
+ try {
+ sendQueueSemaphore.acquire();
+ pulsarSendQueue.put(batchProxyMessage);
+ LOGGER.info("send group id {}, message key
{},with message size {}, the job id is {}, "
+ + "read source is {} sendTime is {}",
inlongGroupId, batchKey,
+
batchProxyMessage.getDataList().size(), jobInstanceId, sourceName,
+ batchProxyMessage.getDataTime());
+ } catch (Exception e) {
+ sendQueueSemaphore.release();
+ LOGGER.error("flush data to send queue", e);
+ }
+ }
+ });
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
+ } catch (Exception ex) {
+ LOGGER.error("error caught", ex);
+ } catch (Throwable t) {
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(),
t);
+ }
+ }
+ };
+ }
+
+ /**
+ * take batchMsg from sendQueue and send to pulsar
+ */
+ private Runnable sendDataThread() {
+ return () -> {
+ LOGGER.info("start pulsar sink send data thread, job[{}],
groupId[{}]", jobInstanceId, inlongGroupId);
+ while (!shutdown) {
+ try {
+ BatchProxyMessage data = pulsarSendQueue.poll(1,
TimeUnit.MILLISECONDS);
+ if (ObjectUtils.isEmpty(data)) {
+ continue;
+ }
+ sendData(data);
+ } catch (Throwable t) {
+ LOGGER.error("Send data error", t);
+ }
+ }
+ };
+ }
+
+ private void sendData(BatchProxyMessage batchMsg) throws
InterruptedException {
+ if (ObjectUtils.isEmpty(batchMsg)) {
+ return;
+ }
+
+ Producer producer = selectProducer();
+ if (ObjectUtils.isEmpty(producer)) {
+ pulsarSendQueue.put(batchMsg);
+ LOGGER.error("send job[{}] data err, empty pulsar producer",
jobInstanceId);
+ return;
+ }
+ InLongMsg message = batchMsg.getInLongMsg();
+ sinkMetric.pluginSendCount.addAndGet(batchMsg.getMsgCnt());
+ if (asyncSend) {
+ CompletableFuture<MessageId> future =
producer.newMessage().eventTime(batchMsg.getDataTime())
+ .value(message.buildArray()).sendAsync();
+ future.whenCompleteAsync((m, t) -> {
+ if (t != null) {
+ // send error
+
sinkMetric.pluginSendFailCount.addAndGet(batchMsg.getMsgCnt());
+ LOGGER.error("send data fail to pulsar, add back to
sendqueue, current queue size {}",
+ pulsarSendQueue.size(), t);
+ try {
+ pulsarSendQueue.put(batchMsg);
+ } catch (InterruptedException e) {
+ LOGGER.error("put back to queue fail send queue size
{}", pulsarSendQueue.size(), t);
+ }
+ } else {
+ // send succerss, update metrics
+ sendQueueSemaphore.release();
+ updateSuccessSendMetrics(batchMsg);
+ }
+ });
+
+ } else {
+ try {
+
producer.newMessage().eventTime(batchMsg.getDataTime()).value(message.buildArray()).send();
+ sendQueueSemaphore.release();
+ updateSuccessSendMetrics(batchMsg);
+ } catch (PulsarClientException e) {
+ sinkMetric.pluginSendFailCount.addAndGet(batchMsg.getMsgCnt());
+ LOGGER.error("send data fail to pulsar, add back to send
queue, send queue size {}",
+ pulsarSendQueue.size(), e);
+ pulsarSendQueue.put(batchMsg);
+ }
+ }
+ }
+
+ private void updateSuccessSendMetrics(BatchProxyMessage batchMsg) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
batchMsg.getGroupId(),
+ batchMsg.getStreamId(), batchMsg.getDataTime(),
batchMsg.getMsgCnt(),
+ batchMsg.getTotalSize());
+ sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
+ if (sourceName != null) {
+ taskPositionManager.updateSinkPosition(batchMsg.getJobId(),
sourceName, batchMsg.getMsgCnt());
+ }
+ }
+
+ private Producer selectProducer() {
+ if (CollectionUtils.isEmpty(pulsarSenders)) {
+ LOGGER.error("send job[{}] data err, empty pulsar sender",
jobInstanceId);
+ return null;
+ }
+ PulsarTopicSender sender = pulsarSenders.get(
+ (CLIENT_INDEX.getAndIncrement() & Integer.MAX_VALUE) %
pulsarSenders.size());
+ return sender.getProducer();
+ }
+
+ private void initPulsarSender() {
+ if (CollectionUtils.isEmpty(mqClusterInfos)) {
+ LOGGER.error("init job[{}] pulsar client fail, empty mqCluster
info", jobInstanceId);
+ return;
+ }
+ for (MQClusterInfo clusterInfo : mqClusterInfos) {
+ if (clusterInfo.isValid()) {
+ try {
+ PulsarClient client =
PulsarClient.builder().serviceUrl(clusterInfo.getUrl())
+ .ioThreads(clientIoThreads)
+
.connectionsPerBroker(connectionsPreBroker).build();
+ pulsarSenders.add(new PulsarTopicSender(client,
producerNum));
+ LOGGER.info("job[{}] init pulsar client url={}",
jobInstanceId, clusterInfo.getUrl());
+ } catch (PulsarClientException e) {
+ LOGGER.error("init job[{}] pulsar client fail",
jobInstanceId, e);
+ }
+ }
+ }
+ }
+
+ class PulsarTopicSender {
+
+ private final AtomicInteger producerIndex = new AtomicInteger(0);
+ private final PulsarClient pulsarClient;
+ private List<Producer> producers;
+
+ public PulsarTopicSender(PulsarClient client, int producerNum) {
+ pulsarClient = client;
+ initProducer(producerNum);
+ }
+
+ public Producer getProducer() {
+ if (CollectionUtils.isEmpty(producers)) {
+ LOGGER.error("job[{}] empty producers", jobInstanceId);
+ return null;
+ }
+ int index = (producerIndex.getAndIncrement() & Integer.MAX_VALUE)
% producers.size();
+ return producers.get(index);
+ }
+
+ /**
+ * close all pulsar producer and shutdown client
+ */
+ public void close() {
+ if (CollectionUtils.isEmpty(producers)) {
+ return;
+ }
+ for (Producer producer : producers) {
+ try {
+ producer.close();
+ } catch (Throwable e) {
+ LOGGER.error("job[{}] close pulsar producer error",
jobInstanceId, e);
+ }
+ }
+ try {
+ pulsarClient.shutdown();
+ } catch (PulsarClientException e) {
+ LOGGER.error("job[{}] close pulsar client error",
jobInstanceId, e);
+ }
+ }
+
+ private void initProducer(int producerNum) {
+ producers = new ArrayList<>(producerNum);
+ for (int i = 0; i < producerNum; i++) {
+ producers.add(createProducer());
+ }
+ }
+
+ private Producer<byte[]> createProducer() {
+ try {
+ return pulsarClient.newProducer().topic(topic)
+ .sendTimeout(sendTimeoutSecond, TimeUnit.SECONDS)
+ .topic(topic)
+ .enableBatching(enableBatch)
+ .blockIfQueueFull(blockIfQueueFull)
+ .maxPendingMessages(maxPendingMessages)
+
.maxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions)
+ .compressionType(compressionType)
+ .batchingMaxMessages(maxBatchingMessages)
+ .batchingMaxBytes(maxBatchingBytes)
+
.batchingMaxPublishDelay(maxBatchingPublishDelayMillis, TimeUnit.MILLISECONDS)
+ .create();
+ } catch (Throwable e) {
+ LOGGER.error("job[{}] create producer[topic:{}] error",
jobInstanceId, topic, e);
+ return null;
+ }
+ }
+
+ }
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index b2d4a83bc..e64b8b543 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -248,8 +248,8 @@ public class SenderManager {
batchMessage.getDataList(), batchMessage.getGroupId(),
batchMessage.getStreamId(),
batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(),
maxSenderTimeout, TimeUnit.SECONDS,
batchMessage.getExtraMap(), proxySend);
- int msgCnt = batchMessage.getDataList().size();
- getMetricItem(batchMessage.getGroupId(),
batchMessage.getStreamId()).pluginSendCount.addAndGet(msgCnt);
+ getMetricItem(batchMessage.getGroupId(),
batchMessage.getStreamId()).pluginSendCount.addAndGet(
+ batchMessage.getMsgCnt());
} catch (Exception exception) {
LOGGER.error("Exception caught", exception);
@@ -271,7 +271,7 @@ public class SenderManager {
LOGGER.warn("max retry reached, retry count is {}, sleep and send
again", retry);
AgentUtils.silenceSleepInMs(retrySleepTime);
}
- int msgCnt = batchMessage.getDataList().size();
+ int msgCnt = batchMessage.getMsgCnt();
String groupId = batchMessage.getGroupId();
String streamId = batchMessage.getStreamId();
long dataTime = batchMessage.getDataTime();
@@ -285,8 +285,8 @@ public class SenderManager {
if (result == SendResult.OK) {
semaphore.release(msgCnt);
metricItem.pluginSendSuccessCount.addAndGet(msgCnt);
- long totalSize =
batchMessage.getDataList().stream().mapToLong(body -> body.length).sum();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId, dataTime, msgCnt, totalSize);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId, dataTime, msgCnt,
+ batchMessage.getTotalSize());
if (sourcePath != null) {
taskPositionManager.updateSinkPosition(batchMessage.getJobId(), sourcePath,
msgCnt);
}
@@ -339,8 +339,8 @@ public class SenderManager {
return;
}
semaphore.release(msgCnt);
- long totalSize =
batchMessage.getDataList().stream().mapToLong(body -> body.length).sum();
- AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId,
streamId, dataTime, msgCnt, totalSize);
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId,
streamId, dataTime, msgCnt,
+ batchMessage.getTotalSize());
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
if (sourcePath != null) {
taskPositionManager.updateSinkPosition(jobId, sourcePath,
msgCnt);
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
new file mode 100644
index 000000000..118c3261c
--- /dev/null
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.MiniAgent;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
+import static
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static org.junit.Assert.assertEquals;
+
+public class PulsarSinkTest {
+
+ private static PulsarSink pulsarSink;
+ private static JobProfile jobProfile;
+ private static AgentBaseTestsHelper helper;
+ private static MiniAgent agent;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ helper = new
AgentBaseTestsHelper(PulsarSinkTest.class.getName()).setupAgentHome();
+ agent = new MiniAgent();
+ jobProfile = JobProfile.parseJsonFile("pulsarSinkJob.json");
+ jobProfile.set("job.mqClusters",
+
"[{\"url\":\"mqurl\",\"token\":\"token\",\"mqType\":\"PULSAR\",\"params\":{}}]");
+ jobProfile.set("job.topicInfo",
"{\"topic\":\"topic\",\"inlongGroupId\":\"groupId\"}");
+ System.out.println(jobProfile.toJsonStr());
+ pulsarSink = new PulsarSink();
+ pulsarSink.init(jobProfile);
+ }
+
+ @Test
+ public void testWrite() {
+ String body = "testMesage";
+ Map<String, String> attr = new HashMap<>();
+ attr.put(PROXY_KEY_GROUP_ID, "groupId");
+ attr.put(PROXY_KEY_STREAM_ID, "streamId");
+ long count = 5;
+ for (long i = 0; i < 5; i++) {
+ pulsarSink.write(new
ProxyMessage(body.getBytes(StandardCharsets.UTF_8), attr));
+ }
+ assertEquals(pulsarSink.sinkMetric.sinkSuccessCount.get(), count);
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/resources/pulsarSinkJob.json
b/inlong-agent/agent-plugins/src/test/resources/pulsarSinkJob.json
new file mode 100644
index 000000000..d2fed3e02
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/pulsarSinkJob.json
@@ -0,0 +1,15 @@
+{
+ "job": {
+ "id": 1,
+ "instance.id": "job_1",
+ "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
+ "sink": "org.apache.inlong.agent.plugin.sinks.PulsarSink",
+ "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
+ "groupId": "groupId",
+ "streamId": "streamId"
+ },
+ "proxy": {
+ "inlongGroupId": "groupId",
+ "inlongStreamId": "streamId"
+ }
+}
\ No newline at end of file
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index ce81bf5d4..7b297cbcd 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -73,7 +73,7 @@ public enum TaskTypeEnum {
return TUBEMQ;
case 12:
return MQTT;
- case 101:
+ case 201:
return MOCK;
default:
throw new RuntimeException("Unsupported task type " +
taskType);
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 6e6463182..0f80b8bfe 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -94,4 +94,6 @@ public interface AttributeConstants {
// dataproxy IP from dp response ack
String MESSAGE_DP_IP = "dpIP";
+
+ String MESSAGE_TOPIC = "topic";
}
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
b/inlong-common/src/main/java/org/apache/inlong/common/util/MessageUtils.java
similarity index 55%
copy from
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
copy to
inlong-common/src/main/java/org/apache/inlong/common/util/MessageUtils.java
index 005084250..39c6dcf5c 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/util/MessageUtils.java
@@ -15,28 +15,26 @@
* limitations under the License.
*/
-package org.apache.inlong.agent.message;
+package org.apache.inlong.common.util;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.commons.collections.MapUtils;
-import java.util.List;
import java.util.Map;
-/**
- * A batch of proxy messages used for batch sending, produced by
PackProxyMessage
- */
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class BatchProxyMessage {
+public class MessageUtils {
- private String jobId;
- private String groupId;
- private String streamId;
- private List<byte[]> dataList;
- private long dataTime;
- private Map<String, String> extraMap;
- private boolean isSyncSend;
+ public static StringBuilder convertAttrToStr(Map<String, String>
extraAttrMap) {
+ StringBuilder attrs = new StringBuilder();
+ if (MapUtils.isEmpty(extraAttrMap)) {
+ return attrs;
+ }
+ for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ attrs.append(key).append("=");
+ attrs.append(value).append("&");
+ }
+ attrs.deleteCharAt(attrs.length() - 1);
+ return attrs;
+ }
}
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 85881b974..9f5244ca5 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -19,6 +19,7 @@
package org.apache.inlong.sdk.dataproxy;
import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -281,7 +282,7 @@ public class DefaultMessageSender implements MessageSender {
if (isProxySend) {
extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
}
- StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+ StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
@@ -393,7 +394,7 @@ public class DefaultMessageSender implements MessageSender {
if (isProxySend) {
extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
}
- StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+ StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
isCompress, isReport,
@@ -516,7 +517,7 @@ public class DefaultMessageSender implements MessageSender {
if (isProxySend) {
extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
}
- StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+ StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
boolean isCompressEnd = (isCompress && (body.length > cpsSize));
if (msgtype == 7 || msgtype == 8) {
@@ -633,7 +634,7 @@ public class DefaultMessageSender implements MessageSender {
if (isProxySend) {
extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
}
- StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+ StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
// if (!isGroupIdTransfer)
@@ -746,7 +747,7 @@ public class DefaultMessageSender implements MessageSender {
}
addIndexCnt(groupId, streamId, bodyList.size());
- StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+ StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
@@ -796,7 +797,7 @@ public class DefaultMessageSender implements MessageSender {
}
addIndexCnt(groupId, streamId, bodyList.size());
- StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+ StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
if (msgtype == 7 || msgtype == 8) {
EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
isCompress,
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index c54a1d25a..320aa1559 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -18,7 +18,6 @@
package org.apache.inlong.sdk.dataproxy.utils;
-import org.apache.commons.collections.MapUtils;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.slf4j.Logger;
@@ -99,21 +98,6 @@ public class ProxyUtils {
return dt;
}
- public static StringBuilder convertAttrToStr(Map<String, String>
extraAttrMap) {
- StringBuilder attrs = new StringBuilder();
- if (MapUtils.isEmpty(extraAttrMap)) {
- return attrs;
- }
- for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) {
- String key = entry.getKey();
- String value = entry.getValue();
- attrs.append(key).append("=");
- attrs.append(value).append("&");
- }
- attrs.deleteCharAt(attrs.length() - 1);
- return attrs;
- }
-
/**
* valid client config
*