This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 7552b05f9 [INLONG-6704][Agent] Add PulsarSink (#6709)
7552b05f9 is described below

commit 7552b05f94806ca3e251d2cfccc43e977ceedf20
Author: xueyingzhang <[email protected]>
AuthorDate: Tue Dec 6 11:22:54 2022 +0800

    [INLONG-6704][Agent] Add PulsarSink (#6709)
---
 .../inlong/agent/constant/AgentConstants.java      |  48 +++
 .../inlong/agent/message/BatchProxyMessage.java    |  20 +
 .../apache/inlong/agent/pojo/JobProfileDto.java    |   2 +-
 .../inlong/agent/conf/TestConfiguration.java       |   2 +-
 .../agent/plugin/message/PackProxyMessage.java     |   9 +
 .../inlong/agent/plugin/sinks/AbstractSink.java    |  20 +
 .../inlong/agent/plugin/sinks/ProxySink.java       |  18 -
 .../inlong/agent/plugin/sinks/PulsarSink.java      | 425 +++++++++++++++++++++
 .../inlong/agent/plugin/sinks/SenderManager.java   |  14 +-
 .../inlong/agent/plugin/sinks/PulsarSinkTest.java  |  68 ++++
 .../src/test/resources/pulsarSinkJob.json          |  15 +
 .../apache/inlong/common/enums/TaskTypeEnum.java   |   2 +-
 .../inlong/common/msg/AttributeConstants.java      |   2 +
 .../apache/inlong/common/util/MessageUtils.java    |  36 +-
 .../inlong/sdk/dataproxy/DefaultMessageSender.java |  13 +-
 .../inlong/sdk/dataproxy/utils/ProxyUtils.java     |  16 -
 16 files changed, 641 insertions(+), 69 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index c332d154c..db2d5adeb 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.agent.constant;
 
+import io.netty.util.NettyRuntime;
+import io.netty.util.internal.SystemPropertyUtil;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 /**
@@ -130,4 +132,50 @@ public class AgentConstants {
     public static final String AGENT_METRIC_LISTENER_CLASS = 
"agent.domainListeners";
     public static final String AGENT_METRIC_LISTENER_CLASS_DEFAULT =
             "org.apache.inlong.agent.metrics.AgentPrometheusMetricListener";
+
+    // pulsar sink config
+    public static final String PULSAR_CLIENT_IO_TREHAD_NUM = 
"agent.sink.pulsar.client.io.thread.num";
+    public static final int DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM = Math.max(1,
+            SystemPropertyUtil.getInt("io.netty.eventLoopThreads", 
NettyRuntime.availableProcessors() * 2));
+
+    public static final String PULSAR_CONNECTION_PRE_BROKER = 
"agent.sink.pulsar.connection.pre.broker";
+    public static final int DEFAULT_PULSAR_CONNECTION_PRE_BROKER = 1;
+
+    public static final String PULSAR_CLIENT_TIMEOUT_SECOND = 
"agent.sink.pulsar.send.timeout.second";
+    public static final int DEFAULT_PULSAR_CLIENT_TIMEOUT_SECOND = 30;
+
+    public static final String PULSAR_CLIENT_ENABLE_BATCH = 
"agent.sink.pullsar.enable.batch";
+    public static final boolean DEFAULT_PULSAR_CLIENT_ENABLE_BATCH = true;
+
+    public static final String PULSAR_CLIENT_BLOCK_IF_QUEUE_FULL = 
"agent.sink.pulsar.block.if.queue.full";
+    public static final boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
+
+    public static final String PULSAR_CLIENT_MAX_PENDING_MESSAGES = 
"agent.sink.pulsar.max.pending.messages";
+    public static final int DEFAULT_MAX_PENDING_MESSAGES = 10000;
+
+    public static final String 
PULSAR_CLIENT_MAX_PENDING_MESSAGES_ACROSS_PARTITION =
+            "agent.sink.pulsar.max.messages.across.partition";
+    public static final int DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITION = 
500000;
+
+    public static final String PULSAR_CLIENT_MAX_BATCH_BYTES = 
"agent.sink.pulsar.max.batch.bytes";
+    public static final int DEFAULT_MAX_BATCH_BYTES = 128 * 1024;
+
+    public static final String PULSAR_CLIENT_MAX_BATCH_MESSAGES = 
"agent.sink.pulsar.max.batch.messages";
+    public static final int DEFAULT_MAX_BATCH_MESSAGES = 1000;
+
+    public static final String PULSAR_CLIENT_MAX_BATCH_INTERVAL_MILLIS = 
"agent.sink.pulsar.max.batch.interval.millis";
+    public static final int DEFAULT_MAX_BATCH_INTERVAL_MILLIS = 1;
+
+    public static final String PULSAR_CLIENT_COMPRESSION_TYPE = 
"agent.sink.pulsar.compression.type";
+    public static final String DEFAULT_COMPRESSION_TYPE = "NONE";
+
+    public static final String PULSAR_CLIENT_PRODUCER_NUM = 
"agent.sink.pulsar.producer.num";
+    public static final int DEFAULT_PRODUCER_NUM = 3;
+
+    public static final String PULSAR_CLIENT_ENABLE_ASYNC_SEND = 
"agent.sink.pulsar.enbale.async.send";
+    public static final boolean DEFAULT_ENABLE_ASYNC_SEND = true;
+
+    public static final String PULSAR_SINK_SEND_QUEUE_SIZE = 
"agent.sink.pulsar.send.queue.size";
+    public static final int DEFAULT_SEND_QUEUE_SIZE = 20000;
+
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
index 005084250..39be15437 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
@@ -20,6 +20,9 @@ package org.apache.inlong.agent.message;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.common.util.MessageUtils;
 
 import java.util.List;
 import java.util.Map;
@@ -39,4 +42,21 @@ public class BatchProxyMessage {
     private long dataTime;
     private Map<String, String> extraMap;
     private boolean isSyncSend;
+
+    public InLongMsg getInLongMsg() {
+        InLongMsg message = InLongMsg.newInLongMsg(true);
+        String attr = MessageUtils.convertAttrToStr(extraMap).toString();
+        for (byte[] lineData : dataList) {
+            message.addMsg(attr, lineData);
+        }
+        return message;
+    }
+
+    public int getMsgCnt() {
+        return CollectionUtils.isEmpty(dataList) ? 0 : dataList.size();
+    }
+
+    public long getTotalSize() {
+        return CollectionUtils.isEmpty(dataList) ? 0 : 
dataList.stream().mapToLong(body -> body.length).sum();
+    }
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 77e1fa855..5987c47e3 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -417,7 +417,7 @@ public class JobProfileDto {
                 break;
             case MOCK:
                 profileDto.setJob(job);
-
+                break;
             default:
         }
         return TriggerProfile.parseJsonStr(GSON.toJson(profileDto));
diff --git 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
index 4019c896b..dcc651135 100755
--- 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
+++ 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/conf/TestConfiguration.java
@@ -87,7 +87,7 @@ public class TestConfiguration {
     @Test
     public void testJobSinkConf() {
         DataConfig dataConfig = new DataConfig();
-        dataConfig.setTaskType(101);
+        dataConfig.setTaskType(201);
         dataConfig.setDataReportType(1);
         JobProfile profile = JobProfileDto.convertToTriggerProfile(dataConfig);
         assertEquals(profile.get(JOB_SINK), DEFAULT_DATAPROXY_SINK);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
index 045dcd00b..d51ba2ae0 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/message/PackProxyMessage.java
@@ -39,6 +39,9 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STRE
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
 import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_SYNC;
+import static org.apache.inlong.common.msg.AttributeConstants.DATA_TIME;
+import static org.apache.inlong.common.msg.AttributeConstants.MESSAGE_TOPIC;
+import static org.apache.inlong.common.msg.AttributeConstants.STREAM_ID;
 
 /**
  * Handle List of BusMessage, which belong to the same stream id.
@@ -87,6 +90,12 @@ public class PackProxyMessage {
         this.extraMap.put(AttributeConstants.MESSAGE_PARTITION_KEY, dataKey);
     }
 
+    public void addTopicAndDataTime(String topic, long dataTime) {
+        this.extraMap.put(STREAM_ID, streamId);
+        this.extraMap.put(MESSAGE_TOPIC, topic);
+        this.extraMap.put(DATA_TIME, String.valueOf(dataTime));
+    }
+
     /**
      * Check whether queue is nearly full
      *
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
index 0928d2c3d..8b6858f78 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/AbstractSink.java
@@ -22,19 +22,24 @@ import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.Sink;
+import org.apache.inlong.agent.plugin.message.PackProxyMessage;
 import org.apache.inlong.common.metric.MetricRegister;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_MESSAGE_FILTER_CLASSNAME;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
@@ -47,12 +52,19 @@ public abstract class AbstractSink implements Sink {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSink.class);
     protected String inlongGroupId;
     protected String inlongStreamId;
+
     // metric
     protected AgentMetricItemSet metricItemSet;
     protected AgentMetricItem sinkMetric;
     protected Map<String, String> dimensions;
     protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+
     protected JobProfile jobConf;
+    protected String sourceName;
+    protected String jobInstanceId;
+    protected int batchFlushInterval;
+    // key is stream id, value is a batch of messages belong to the same 
stream id
+    protected ConcurrentHashMap<String, PackProxyMessage> cache;
 
     @Override
     public MessageFilter initMessageFilter(JobProfile jobConf) {
@@ -67,11 +79,19 @@ public abstract class AbstractSink implements Sink {
         return null;
     }
 
+    @Override
+    public void setSourceName(String sourceFileName) {
+        this.sourceName = sourceFileName;
+    }
+
     @Override
     public void init(JobProfile jobConf) {
         this.jobConf = jobConf;
+        jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
         inlongGroupId = jobConf.get(PROXY_INLONG_GROUP_ID, 
DEFAULT_PROXY_INLONG_GROUP_ID);
         inlongStreamId = jobConf.get(PROXY_INLONG_STREAM_ID, 
DEFAULT_PROXY_INLONG_STREAM_ID);
+        cache = new ConcurrentHashMap<>(10);
+        batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL, 
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
 
         this.dimensions = new HashMap<>();
         dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index dfe4b7a33..f8eff052d 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -32,7 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -40,9 +39,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
-import static 
org.apache.inlong.agent.constant.JobConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
-import static org.apache.inlong.agent.constant.JobConstants.JOB_INSTANCE_ID;
-import static 
org.apache.inlong.agent.constant.JobConstants.PROXY_BATCH_FLUSH_INTERVAL;
 
 /**
  * sink message data to inlong-dataproxy
@@ -57,12 +53,7 @@ public class ProxySink extends AbstractSink {
     private MessageFilter messageFilter;
     private SenderManager senderManager;
     private byte[] fieldSplitter;
-    private String sourceName;
-    private String jobInstanceId;
-    private int batchFlushInterval;
     private volatile boolean shutdown = false;
-    // key is stream id, value is a batch of messages belong to the same 
stream id
-    private ConcurrentHashMap<String, PackProxyMessage> cache;
 
     public ProxySink() {
     }
@@ -115,11 +106,6 @@ public class ProxySink extends AbstractSink {
         }
     }
 
-    @Override
-    public void setSourceName(String sourceFileName) {
-        this.sourceName = sourceFileName;
-    }
-
     /**
      * flush cache by batch
      *
@@ -154,10 +140,6 @@ public class ProxySink extends AbstractSink {
     @Override
     public void init(JobProfile jobConf) {
         super.init(jobConf);
-        jobInstanceId = jobConf.get(JOB_INSTANCE_ID);
-        batchFlushInterval = jobConf.getInt(PROXY_BATCH_FLUSH_INTERVAL,
-                DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
-        cache = new ConcurrentHashMap<>(10);
         messageFilter = initMessageFilter(jobConf);
         fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER, 
DEFAULT_FIELD_SPLITTER).getBytes(
                 StandardCharsets.UTF_8);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
new file mode 100644
index 000000000..43b88e423
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/PulsarSink.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.message.BatchProxyMessage;
+import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.message.PackProxyMessage;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_BLOCK_IF_QUEUE_FULL;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_COMPRESSION_TYPE;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_ENABLE_ASYNC_SEND;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_BATCH_BYTES;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_BATCH_INTERVAL_MILLIS;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_BATCH_MESSAGES;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_PENDING_MESSAGES;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITION;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PRODUCER_NUM;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PULSAR_CLIENT_ENABLE_BATCH;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PULSAR_CLIENT_TIMEOUT_SECOND;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PULSAR_CONNECTION_PRE_BROKER;
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_SEND_QUEUE_SIZE;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_BLOCK_IF_QUEUE_FULL;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_COMPRESSION_TYPE;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_ENABLE_ASYNC_SEND;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_ENABLE_BATCH;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_IO_TREHAD_NUM;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_BATCH_BYTES;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_BATCH_INTERVAL_MILLIS;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_BATCH_MESSAGES;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_PENDING_MESSAGES;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_MAX_PENDING_MESSAGES_ACROSS_PARTITION;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_PRODUCER_NUM;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CLIENT_TIMEOUT_SECOND;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_CONNECTION_PRE_BROKER;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PULSAR_SINK_SEND_QUEUE_SIZE;
+
+public class PulsarSink extends AbstractSink {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(LoggerFactory.class);
+    private static final AtomicInteger CLIENT_INDEX = new AtomicInteger(0);
+    private static final ExecutorService EXECUTOR_SERVICE = new 
ThreadPoolExecutor(0, Integer.MAX_VALUE,
+            60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new 
AgentThreadFactory("PulsarSink"));
+    private final AgentConfiguration agentConf = 
AgentConfiguration.getAgentConf();
+    private TaskPositionManager taskPositionManager;
+    private volatile boolean shutdown = false;
+    private List<MQClusterInfo> mqClusterInfos;
+    private String topic;
+    private List<PulsarTopicSender> pulsarSenders;
+    private int clientIoThreads;
+
+    private int sendQueueSize;
+    private Semaphore sendQueueSemaphore; // limit the count of 
batchProxyMessage waiting to be sent
+    private LinkedBlockingQueue<BatchProxyMessage> pulsarSendQueue;
+
+    // pulsar client parameters
+    private int connectionsPreBroker;
+    private boolean enableBatch;
+    private boolean blockIfQueueFull;
+    private int maxPendingMessages;
+    private int maxPendingMessagesAcrossPartitions;
+    private CompressionType compressionType;
+    private int maxBatchingBytes;
+    private int maxBatchingMessages;
+    private long maxBatchingPublishDelayMillis;
+    private int sendTimeoutSecond;
+    private int producerNum;
+    private boolean asyncSend;
+
+    @Override
+    public void init(JobProfile jobConf) {
+        super.init(jobConf);
+        taskPositionManager = TaskPositionManager.getTaskPositionManager();
+        // agentConf
+        sendQueueSize = agentConf.getInt(PULSAR_SINK_SEND_QUEUE_SIZE, 
DEFAULT_SEND_QUEUE_SIZE);
+        sendQueueSemaphore = new Semaphore(sendQueueSize);
+        pulsarSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
+        clientIoThreads = agentConf.getInt(PULSAR_CLIENT_IO_TREHAD_NUM, 
DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM);
+        connectionsPreBroker = agentConf.getInt(PULSAR_CONNECTION_PRE_BROKER, 
DEFAULT_PULSAR_CONNECTION_PRE_BROKER);
+        sendTimeoutSecond = agentConf.getInt(PULSAR_CLIENT_TIMEOUT_SECOND, 
DEFAULT_PULSAR_CLIENT_TIMEOUT_SECOND);
+        enableBatch = agentConf.getBoolean(PULSAR_CLIENT_ENABLE_BATCH, 
DEFAULT_PULSAR_CLIENT_ENABLE_BATCH);
+        blockIfQueueFull = 
agentConf.getBoolean(PULSAR_CLIENT_BLOCK_IF_QUEUE_FULL, 
DEFAULT_BLOCK_IF_QUEUE_FULL);
+        maxPendingMessages = 
agentConf.getInt(PULSAR_CLIENT_MAX_PENDING_MESSAGES, 
DEFAULT_MAX_PENDING_MESSAGES);
+        maxBatchingBytes = agentConf.getInt(PULSAR_CLIENT_MAX_BATCH_BYTES, 
DEFAULT_MAX_BATCH_BYTES);
+        maxBatchingMessages = 
agentConf.getInt(PULSAR_CLIENT_MAX_BATCH_MESSAGES, DEFAULT_MAX_BATCH_MESSAGES);
+        maxBatchingPublishDelayMillis = 
agentConf.getInt(PULSAR_CLIENT_MAX_BATCH_INTERVAL_MILLIS,
+                DEFAULT_MAX_BATCH_INTERVAL_MILLIS);
+        maxPendingMessagesAcrossPartitions = 
agentConf.getInt(PULSAR_CLIENT_MAX_PENDING_MESSAGES_ACROSS_PARTITION,
+                DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITION);
+        producerNum = agentConf.getInt(PULSAR_CLIENT_PRODUCER_NUM, 
DEFAULT_PRODUCER_NUM);
+        asyncSend = agentConf.getBoolean(PULSAR_CLIENT_ENABLE_ASYNC_SEND, 
DEFAULT_ENABLE_ASYNC_SEND);
+        String compresstion = agentConf.get(PULSAR_CLIENT_COMPRESSION_TYPE, 
DEFAULT_COMPRESSION_TYPE);
+        if (StringUtils.isNotEmpty(compresstion)) {
+            compressionType = CompressionType.valueOf(compresstion);
+        } else {
+            compressionType = CompressionType.NONE;
+        }
+        // jobConf
+        mqClusterInfos = jobConf.getMqClusters();
+        
Preconditions.checkArgument(ObjectUtils.isNotEmpty(jobConf.getMqTopic()) && 
jobConf.getMqTopic().isValid(),
+                "no valid pulsar topic config");
+        topic = jobConf.getMqTopic().getTopic();
+        pulsarSenders = new ArrayList<>();
+        initPulsarSender();
+        EXECUTOR_SERVICE.execute(sendDataThread());
+        EXECUTOR_SERVICE.execute(flushCache());
+    }
+
+    @Override
+    public void write(Message message) {
+        try {
+            if (message != null) {
+                if (!(message instanceof EndMessage)) {
+                    ProxyMessage proxyMessage = ProxyMessage.parse(message);
+                    // add proxy message to cache.
+                    cache.compute(proxyMessage.getBatchKey(),
+                            (s, packProxyMessage) -> {
+                                if (packProxyMessage == null) {
+                                    packProxyMessage =
+                                            new 
PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId);
+                                    
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
+                                    
packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis());
+                                }
+                                // add message to package proxy
+                                packProxyMessage.addProxyMessage(proxyMessage);
+                                return packProxyMessage;
+                            });
+                    // increment the count of successful sinks
+                    sinkMetric.sinkSuccessCount.incrementAndGet();
+                } else {
+                    // increment the count of failed sinks
+                    sinkMetric.sinkFailCount.incrementAndGet();
+                }
+            }
+        } catch (Exception e) {
+            LOGGER.error("write message to Proxy sink error", e);
+        } catch (Throwable t) {
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
+        }
+
+    }
+
+    @Override
+    public void destroy() {
+        LOGGER.info("destroy pulsar sink, job[{}], source[{}]", jobInstanceId, 
sourceName);
+        while (!sinkFinish()) {
+            LOGGER.info("job {} wait until cache all data to pulsar", 
jobInstanceId);
+            AgentUtils.silenceSleepInMs(batchFlushInterval);
+        }
+        shutdown = true;
+        EXECUTOR_SERVICE.shutdown();
+        if (CollectionUtils.isNotEmpty(pulsarSenders)) {
+            for (PulsarTopicSender sender : pulsarSenders) {
+                sender.close();
+            }
+            pulsarSenders.clear();
+        }
+    }
+
+    private boolean sinkFinish() {
+        return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && 
pulsarSendQueue.isEmpty();
+    }
+
+    /**
+     * flush cache by batch
+     *
+     * @return thread runner
+     */
+    private Runnable flushCache() {
+        return () -> {
+            LOGGER.info("start flush cache thread for {} ProxySink", 
inlongGroupId);
+            while (!shutdown) {
+                try {
+                    cache.forEach((batchKey, packProxyMessage) -> {
+                        BatchProxyMessage batchProxyMessage = 
packProxyMessage.fetchBatch();
+                        if (batchProxyMessage != null) {
+                            try {
+                                sendQueueSemaphore.acquire();
+                                pulsarSendQueue.put(batchProxyMessage);
+                                LOGGER.info("send group id {}, message key 
{},with message size {}, the job id is {}, "
+                                        + "read source is {} sendTime is {}", 
inlongGroupId, batchKey,
+                                        
batchProxyMessage.getDataList().size(), jobInstanceId, sourceName,
+                                        batchProxyMessage.getDataTime());
+                            } catch (Exception e) {
+                                sendQueueSemaphore.release();
+                                LOGGER.error("flush data to send queue", e);
+                            }
+                        }
+                    });
+                    AgentUtils.silenceSleepInMs(batchFlushInterval);
+                } catch (Exception ex) {
+                    LOGGER.error("error caught", ex);
+                } catch (Throwable t) {
+                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
t);
+                }
+            }
+        };
+    }
+
+    /**
+     * take batchMsg from sendQueue and send to pulsar
+     */
+    private Runnable sendDataThread() {
+        return () -> {
+            LOGGER.info("start pulsar sink send data thread, job[{}], 
groupId[{}]", jobInstanceId, inlongGroupId);
+            while (!shutdown) {
+                try {
+                    BatchProxyMessage data = pulsarSendQueue.poll(1, 
TimeUnit.MILLISECONDS);
+                    if (ObjectUtils.isEmpty(data)) {
+                        continue;
+                    }
+                    sendData(data);
+                } catch (Throwable t) {
+                    LOGGER.error("Send data error", t);
+                }
+            }
+        };
+    }
+
+    private void sendData(BatchProxyMessage batchMsg) throws 
InterruptedException {
+        if (ObjectUtils.isEmpty(batchMsg)) {
+            return;
+        }
+
+        Producer producer = selectProducer();
+        if (ObjectUtils.isEmpty(producer)) {
+            pulsarSendQueue.put(batchMsg);
+            LOGGER.error("send job[{}] data err, empty pulsar producer", 
jobInstanceId);
+            return;
+        }
+        InLongMsg message = batchMsg.getInLongMsg();
+        sinkMetric.pluginSendCount.addAndGet(batchMsg.getMsgCnt());
+        if (asyncSend) {
+            CompletableFuture<MessageId> future = 
producer.newMessage().eventTime(batchMsg.getDataTime())
+                    .value(message.buildArray()).sendAsync();
+            future.whenCompleteAsync((m, t) -> {
+                if (t != null) {
+                    // send error
+                    
sinkMetric.pluginSendFailCount.addAndGet(batchMsg.getMsgCnt());
+                    LOGGER.error("send data fail to pulsar, add back to 
sendqueue, current queue size {}",
+                            pulsarSendQueue.size(), t);
+                    try {
+                        pulsarSendQueue.put(batchMsg);
+                    } catch (InterruptedException e) {
+                        LOGGER.error("put back to queue fail send queue size 
{}", pulsarSendQueue.size(), t);
+                    }
+                } else {
+                    // send succerss, update metrics
+                    sendQueueSemaphore.release();
+                    updateSuccessSendMetrics(batchMsg);
+                }
+            });
+
+        } else {
+            try {
+                
producer.newMessage().eventTime(batchMsg.getDataTime()).value(message.buildArray()).send();
+                sendQueueSemaphore.release();
+                updateSuccessSendMetrics(batchMsg);
+            } catch (PulsarClientException e) {
+                sinkMetric.pluginSendFailCount.addAndGet(batchMsg.getMsgCnt());
+                LOGGER.error("send data fail to pulsar, add back to send 
queue, send queue size {}",
+                        pulsarSendQueue.size(), e);
+                pulsarSendQueue.put(batchMsg);
+            }
+        }
+    }
+
+    private void updateSuccessSendMetrics(BatchProxyMessage batchMsg) {
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
batchMsg.getGroupId(),
+                batchMsg.getStreamId(), batchMsg.getDataTime(), 
batchMsg.getMsgCnt(),
+                batchMsg.getTotalSize());
+        sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
+        if (sourceName != null) {
+            taskPositionManager.updateSinkPosition(batchMsg.getJobId(), 
sourceName, batchMsg.getMsgCnt());
+        }
+    }
+
+    private Producer selectProducer() {
+        if (CollectionUtils.isEmpty(pulsarSenders)) {
+            LOGGER.error("send job[{}] data err, empty pulsar sender", 
jobInstanceId);
+            return null;
+        }
+        PulsarTopicSender sender = pulsarSenders.get(
+                (CLIENT_INDEX.getAndIncrement() & Integer.MAX_VALUE) % 
pulsarSenders.size());
+        return sender.getProducer();
+    }
+
+    private void initPulsarSender() {
+        if (CollectionUtils.isEmpty(mqClusterInfos)) {
+            LOGGER.error("init job[{}] pulsar client fail, empty mqCluster 
info", jobInstanceId);
+            return;
+        }
+        for (MQClusterInfo clusterInfo : mqClusterInfos) {
+            if (clusterInfo.isValid()) {
+                try {
+                    PulsarClient client = 
PulsarClient.builder().serviceUrl(clusterInfo.getUrl())
+                            .ioThreads(clientIoThreads)
+                            
.connectionsPerBroker(connectionsPreBroker).build();
+                    pulsarSenders.add(new PulsarTopicSender(client, 
producerNum));
+                    LOGGER.info("job[{}] init pulsar client url={}", 
jobInstanceId, clusterInfo.getUrl());
+                } catch (PulsarClientException e) {
+                    LOGGER.error("init job[{}] pulsar client fail", 
jobInstanceId, e);
+                }
+            }
+        }
+    }
+
+    class PulsarTopicSender {
+
+        private final AtomicInteger producerIndex = new AtomicInteger(0);
+        private final PulsarClient pulsarClient;
+        private List<Producer> producers;
+
+        public PulsarTopicSender(PulsarClient client, int producerNum) {
+            pulsarClient = client;
+            initProducer(producerNum);
+        }
+
+        public Producer getProducer() {
+            if (CollectionUtils.isEmpty(producers)) {
+                LOGGER.error("job[{}] empty producers", jobInstanceId);
+                return null;
+            }
+            int index = (producerIndex.getAndIncrement() & Integer.MAX_VALUE) 
% producers.size();
+            return producers.get(index);
+        }
+
+        /**
+         * close all pulsar producer and shutdown client
+         */
+        public void close() {
+            if (CollectionUtils.isEmpty(producers)) {
+                return;
+            }
+            for (Producer producer : producers) {
+                try {
+                    producer.close();
+                } catch (Throwable e) {
+                    LOGGER.error("job[{}] close pulsar producer error", 
jobInstanceId, e);
+                }
+            }
+            try {
+                pulsarClient.shutdown();
+            } catch (PulsarClientException e) {
+                LOGGER.error("job[{}] close pulsar client error", 
jobInstanceId, e);
+            }
+        }
+
+        private void initProducer(int producerNum) {
+            producers = new ArrayList<>(producerNum);
+            for (int i = 0; i < producerNum; i++) {
+                producers.add(createProducer());
+            }
+        }
+
+        private Producer<byte[]> createProducer() {
+            try {
+                return pulsarClient.newProducer().topic(topic)
+                        .sendTimeout(sendTimeoutSecond, TimeUnit.SECONDS)
+                        .topic(topic)
+                        .enableBatching(enableBatch)
+                        .blockIfQueueFull(blockIfQueueFull)
+                        .maxPendingMessages(maxPendingMessages)
+                        
.maxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions)
+                        .compressionType(compressionType)
+                        .batchingMaxMessages(maxBatchingMessages)
+                        .batchingMaxBytes(maxBatchingBytes)
+                        
.batchingMaxPublishDelay(maxBatchingPublishDelayMillis, TimeUnit.MILLISECONDS)
+                        .create();
+            } catch (Throwable e) {
+                LOGGER.error("job[{}] create producer[topic:{}] error", 
jobInstanceId, topic, e);
+                return null;
+            }
+        }
+
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index b2d4a83bc..e64b8b543 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -248,8 +248,8 @@ public class SenderManager {
                     batchMessage.getDataList(), batchMessage.getGroupId(), 
batchMessage.getStreamId(),
                     batchMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), 
maxSenderTimeout, TimeUnit.SECONDS,
                     batchMessage.getExtraMap(), proxySend);
-            int msgCnt = batchMessage.getDataList().size();
-            getMetricItem(batchMessage.getGroupId(), 
batchMessage.getStreamId()).pluginSendCount.addAndGet(msgCnt);
+            getMetricItem(batchMessage.getGroupId(), 
batchMessage.getStreamId()).pluginSendCount.addAndGet(
+                    batchMessage.getMsgCnt());
 
         } catch (Exception exception) {
             LOGGER.error("Exception caught", exception);
@@ -271,7 +271,7 @@ public class SenderManager {
             LOGGER.warn("max retry reached, retry count is {}, sleep and send 
again", retry);
             AgentUtils.silenceSleepInMs(retrySleepTime);
         }
-        int msgCnt = batchMessage.getDataList().size();
+        int msgCnt = batchMessage.getMsgCnt();
         String groupId = batchMessage.getGroupId();
         String streamId = batchMessage.getStreamId();
         long dataTime = batchMessage.getDataTime();
@@ -285,8 +285,8 @@ public class SenderManager {
             if (result == SendResult.OK) {
                 semaphore.release(msgCnt);
                 metricItem.pluginSendSuccessCount.addAndGet(msgCnt);
-                long totalSize = 
batchMessage.getDataList().stream().mapToLong(body -> body.length).sum();
-                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId, dataTime, msgCnt, totalSize);
+                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId, dataTime, msgCnt,
+                        batchMessage.getTotalSize());
                 if (sourcePath != null) {
                     
taskPositionManager.updateSinkPosition(batchMessage.getJobId(), sourcePath, 
msgCnt);
                 }
@@ -339,8 +339,8 @@ public class SenderManager {
                 return;
             }
             semaphore.release(msgCnt);
-            long totalSize = 
batchMessage.getDataList().stream().mapToLong(body -> body.length).sum();
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, 
streamId, dataTime, msgCnt, totalSize);
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, 
streamId, dataTime, msgCnt,
+                    batchMessage.getTotalSize());
             getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
             if (sourcePath != null) {
                 taskPositionManager.updateSinkPosition(jobId, sourcePath, 
msgCnt);
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
new file mode 100644
index 000000000..118c3261c
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.MiniAgent;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static org.junit.Assert.assertEquals;
+
+public class PulsarSinkTest {
+
+    private static PulsarSink pulsarSink;
+    private static JobProfile jobProfile;
+    private static AgentBaseTestsHelper helper;
+    private static MiniAgent agent;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        helper = new 
AgentBaseTestsHelper(PulsarSinkTest.class.getName()).setupAgentHome();
+        agent = new MiniAgent();
+        jobProfile = JobProfile.parseJsonFile("pulsarSinkJob.json");
+        jobProfile.set("job.mqClusters",
+                
"[{\"url\":\"mqurl\",\"token\":\"token\",\"mqType\":\"PULSAR\",\"params\":{}}]");
+        jobProfile.set("job.topicInfo", 
"{\"topic\":\"topic\",\"inlongGroupId\":\"groupId\"}");
+        System.out.println(jobProfile.toJsonStr());
+        pulsarSink = new PulsarSink();
+        pulsarSink.init(jobProfile);
+    }
+
+    @Test
+    public void testWrite() {
+        String body = "testMesage";
+        Map<String, String> attr = new HashMap<>();
+        attr.put(PROXY_KEY_GROUP_ID, "groupId");
+        attr.put(PROXY_KEY_STREAM_ID, "streamId");
+        long count = 5;
+        for (long i = 0; i < 5; i++) {
+            pulsarSink.write(new 
ProxyMessage(body.getBytes(StandardCharsets.UTF_8), attr));
+        }
+        assertEquals(pulsarSink.sinkMetric.sinkSuccessCount.get(), count);
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/resources/pulsarSinkJob.json 
b/inlong-agent/agent-plugins/src/test/resources/pulsarSinkJob.json
new file mode 100644
index 000000000..d2fed3e02
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/pulsarSinkJob.json
@@ -0,0 +1,15 @@
+{
+  "job": {
+    "id": 1,
+    "instance.id": "job_1",
+    "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
+    "sink": "org.apache.inlong.agent.plugin.sinks.PulsarSink",
+    "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
+    "groupId": "groupId",
+    "streamId": "streamId"
+  },
+  "proxy": {
+    "inlongGroupId": "groupId",
+    "inlongStreamId": "streamId"
+  }
+}
\ No newline at end of file
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index ce81bf5d4..7b297cbcd 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -73,7 +73,7 @@ public enum TaskTypeEnum {
                 return TUBEMQ;
             case 12:
                 return MQTT;
-            case 101:
+            case 201:
                 return MOCK;
             default:
                 throw new RuntimeException("Unsupported task type " + 
taskType);
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
index 6e6463182..0f80b8bfe 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/msg/AttributeConstants.java
@@ -94,4 +94,6 @@ public interface AttributeConstants {
 
     // dataproxy IP from dp response ack
     String MESSAGE_DP_IP = "dpIP";
+
+    String MESSAGE_TOPIC = "topic";
 }
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
 b/inlong-common/src/main/java/org/apache/inlong/common/util/MessageUtils.java
similarity index 55%
copy from 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
copy to 
inlong-common/src/main/java/org/apache/inlong/common/util/MessageUtils.java
index 005084250..39c6dcf5c 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/BatchProxyMessage.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/util/MessageUtils.java
@@ -15,28 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.agent.message;
+package org.apache.inlong.common.util;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.commons.collections.MapUtils;
 
-import java.util.List;
 import java.util.Map;
 
-/**
- * A batch of proxy messages used for batch sending, produced by 
PackProxyMessage
- */
-@Data
-@AllArgsConstructor
-@NoArgsConstructor
-public class BatchProxyMessage {
+public class MessageUtils {
 
-    private String jobId;
-    private String groupId;
-    private String streamId;
-    private List<byte[]> dataList;
-    private long dataTime;
-    private Map<String, String> extraMap;
-    private boolean isSyncSend;
+    public static StringBuilder convertAttrToStr(Map<String, String> 
extraAttrMap) {
+        StringBuilder attrs = new StringBuilder();
+        if (MapUtils.isEmpty(extraAttrMap)) {
+            return attrs;
+        }
+        for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            attrs.append(key).append("=");
+            attrs.append(value).append("&");
+        }
+        attrs.deleteCharAt(attrs.length() - 1);
+        return attrs;
+    }
 }
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index 85881b974..9f5244ca5 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -19,6 +19,7 @@
 package org.apache.inlong.sdk.dataproxy;
 
 import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.common.util.MessageUtils;
 import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
 import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -281,7 +282,7 @@ public class DefaultMessageSender implements MessageSender {
         if (isProxySend) {
             extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
         }
-        StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+        StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
 
@@ -393,7 +394,7 @@ public class DefaultMessageSender implements MessageSender {
         if (isProxySend) {
             extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
         }
-        StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+        StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, 
isCompress, isReport,
@@ -516,7 +517,7 @@ public class DefaultMessageSender implements MessageSender {
         if (isProxySend) {
             extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
         }
-        StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+        StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
 
         boolean isCompressEnd = (isCompress && (body.length > cpsSize));
         if (msgtype == 7 || msgtype == 8) {
@@ -633,7 +634,7 @@ public class DefaultMessageSender implements MessageSender {
         if (isProxySend) {
             extraAttrMap.put(AttributeConstants.MESSAGE_PROXY_SEND, "true");
         }
-        StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+        StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
             // if (!isGroupIdTransfer)
@@ -746,7 +747,7 @@ public class DefaultMessageSender implements MessageSender {
         }
         addIndexCnt(groupId, streamId, bodyList.size());
 
-        StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+        StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype,
@@ -796,7 +797,7 @@ public class DefaultMessageSender implements MessageSender {
         }
         addIndexCnt(groupId, streamId, bodyList.size());
 
-        StringBuilder attrs = ProxyUtils.convertAttrToStr(extraAttrMap);
+        StringBuilder attrs = MessageUtils.convertAttrToStr(extraAttrMap);
 
         if (msgtype == 7 || msgtype == 8) {
             EncodeObject encodeObject = new EncodeObject(bodyList, msgtype, 
isCompress,
diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
index c54a1d25a..320aa1559 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/utils/ProxyUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.inlong.sdk.dataproxy.utils;
 
-import org.apache.commons.collections.MapUtils;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.network.Utils;
 import org.slf4j.Logger;
@@ -99,21 +98,6 @@ public class ProxyUtils {
         return dt;
     }
 
-    public static StringBuilder convertAttrToStr(Map<String, String> 
extraAttrMap) {
-        StringBuilder attrs = new StringBuilder();
-        if (MapUtils.isEmpty(extraAttrMap)) {
-            return attrs;
-        }
-        for (Map.Entry<String, String> entry : extraAttrMap.entrySet()) {
-            String key = entry.getKey();
-            String value = entry.getValue();
-            attrs.append(key).append("=");
-            attrs.append(value).append("&");
-        }
-        attrs.deleteCharAt(attrs.length() - 1);
-        return attrs;
-    }
-
     /**
      * valid client config
      *

Reply via email to