This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f167402  [INLONG][feature][audit] add audit-source module (#1905)
f167402 is described below

commit f167402db3be512a6bedb5cad6c6aeb1f9e8f597
Author: baomingyu <[email protected]>
AuthorDate: Tue Dec 7 09:44:23 2021 +0800

    [INLONG][feature][audit] add audit-source module (#1905)
---
 inlong-audit/audit-source/conf/audit.conf          |  90 ++++
 inlong-audit/audit-source/conf/pulsar.properties   |  19 +
 inlong-audit/audit-source/pom.xml                  |   5 +
 .../audit/base/HighPriorityThreadFactory.java      |  50 +++
 .../inlong/audit/base/NamedThreadFactory.java      |  44 ++
 .../audit/channel/FailoverChannelSelector.java     | 108 +++++
 .../org/apache/inlong/audit/node/Application.java  | 349 +++++++++++++++
 .../org/apache/inlong/audit/sink/EventStat.java    |  52 +++
 .../org/apache/inlong/audit/sink/PulsarSink.java   | 486 +++++++++++++++++++++
 .../sink/pulsar/CreatePulsarClientCallBack.java    |  36 ++
 .../audit/sink/pulsar/PulsarClientService.java     | 252 +++++++++++
 .../audit/sink/pulsar/SendMessageCallBack.java     |  27 ++
 .../inlong/audit/source/DefaultServiceDecoder.java |  60 +++
 .../inlong/audit/source/ServerMessageFactory.java  | 110 +++++
 .../inlong/audit/source/ServerMessageHandler.java  | 217 +++++++++
 .../apache/inlong/audit/source/ServiceDecoder.java |  37 ++
 .../inlong/audit/source/SimpleTcpSource.java       | 262 +++++++++++
 .../utils/FailoverChannelProcessorHolder.java      |  32 ++
 .../org/apache/inlong/audit/utils/LogCounter.java  |  52 +++
 .../apache/inlong/audit/utils/NetworkUtils.java    |  75 ++++
 .../apache/inlong/audit/sink/PulsarSinkTest.java   |  94 ++++
 21 files changed, 2457 insertions(+)

diff --git a/inlong-audit/audit-source/conf/audit.conf 
b/inlong-audit/audit-source/conf/audit.conf
new file mode 100644
index 0000000..eec6d76
--- /dev/null
+++ b/inlong-audit/audit-source/conf/audit.conf
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# The configuration file needs to define the sources,
+# the channels and the sinks.
+# Sources, channels and sinks are defined per agent,
+# in this case called 'agent'
+
+agent1.sources = tcp-source
+agent1.channels = ch-msg1 ch-msg2
+agent1.sinks = pulsar-sink-msg1 pulsar-sink-msg2
+
+agent1.sources.tcp-source.channels = ch-msg1 ch-msg2
+agent1.sources.tcp-source.type = org.apache.flume.source.SimpleTcpSource
+agent1.sources.tcp-source.msg-factory-name = 
org.apache.flume.source.ServerMessageFactory
+agent1.sources.tcp-source.host = 127.0.0.1
+agent1.sources.tcp-source.port = 46801
+agent1.sources.tcp-source.max-msg-length = 524288
+agent1.sources.tcp-source.connections = 30000
+agent1.sources.tcp-source.max-threads = 64
+agent1.sources.tcp-source.receiveBufferSize = 1048576
+agent1.sources.tcp-source.sendBufferSize = 1048576
+agent1.sources.tcp-source.custom-cp = true
+agent1.sources.tcp-source.selector.type = 
org.apache.flume.channel.FailoverChannelSelector
+agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
+agent1.sources.tcp-source.metric-recovery-path=/data/tdbank/audit/flume/recovery
+agent1.sources.tcp-source.metric-agent-port=8003
+agent1.sources.tcp-source.metric-cache-size=1000000
+agent1.sources.tcp-source.set=10
+
+agent1.channels.ch-msg1.type = memory
+agent1.channels.ch-msg1.capacity = 10000
+agent1.channels.ch-msg1.keep-alive = 0
+agent1.channels.ch-msg1.transactionCapacity = 200
+
+agent1.channels.ch-msg2.type = file
+agent1.channels.ch-msg2.capacity = 100000000
+agent1.channels.ch-msg2.maxFileSize = 1073741824
+agent1.channels.ch-msg2.minimumRequiredSpace = 1073741824
+agent1.channels.ch-msg2.checkpointDir = /data/tdbank/audit/file/ch-msg5/check
+agent1.channels.ch-msg2.dataDirs = /data/tdbank/audit/file/ch-msg5/data
+agent1.channels.ch-msg2.fsyncPerTransaction = false
+agent1.channels.ch-msg2.fsyncInterval = 10
+
+agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
+agent1.sinks.pulsar-sink-msg1.type = org.apache.flume.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url= 
pulsar://127.0.0.1:6650,127.0.0.2:6650
+agent1.sinks.pulsar-sink-msg1.topic = persistent://public/default/audit-test
+agent1.sinks.pulsar-sink-msg1.send_timeout_ms = 30000
+agent1.sinks.pulsar-sink-msg1.client_op_timeout_second = 30000
+agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
+agent1.sinks.pulsar-sink-msg1.enable_batch = true
+agent1.sinks.pulsar-sink-msg1.block_if_queue_full = true
+agent1.sinks.pulsar-sink-msg1.max_pending_messages = 10000
+agent1.sinks.pulsar-sink-msg1.max_batching_messages = 1000
+agent1.sinks.pulsar-sink-msg1.retry_interval_when_send_error_ms = 30000
+agent1.sinks.pulsar-sink-msg1.thread_num = 8
+agent1.sinks.pulsar-sink-msg1.log_every_n_events = 100000
+agent1.sinks.pulsar-sink-msg1.disk_io_rate_per_sec= 20000000
+
+agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
+agent1.sinks.pulsar-sink-msg2.type = org.apache.flume.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg2.pulsar_server_url = 
pulsar://127.0.0.1:6650,127.0.0.2:6650
+agent1.sinks.pulsar-sink-msg1.topic = persistent://public/default/audit-test
+agent1.sinks.pulsar-sink-msg2.send_timeout_ms = 30000
+agent1.sinks.pulsar-sink-msg2.client_op_timeout_second = 30000
+agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
+agent1.sinks.pulsar-sink-msg2.enable_batch = true
+agent1.sinks.pulsar-sink-msg2.block_if_queue_full = true
+agent1.sinks.pulsar-sink-msg2.max_pending_messages = 10000
+agent1.sinks.pulsar-sink-msg2.max_batching_messages = 1000
+agent1.sinks.pulsar-sink-msg2.retry_interval_when_send_error_ms = 30000
+agent1.sinks.pulsar-sink-msg2.thread_num = 8
+agent1.sinks.pulsar-sink-msg2.log_every_n_events = 100000
+agent1.sinks.pulsar-sink-msg2.disk_io_rate_per_sec= 20000000
\ No newline at end of file
diff --git a/inlong-audit/audit-source/conf/pulsar.properties 
b/inlong-audit/audit-source/conf/pulsar.properties
new file mode 100644
index 0000000..5cd8d02
--- /dev/null
+++ b/inlong-audit/audit-source/conf/pulsar.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+topic=persistent://public/default/audit-test
\ No newline at end of file
diff --git a/inlong-audit/audit-source/pom.xml 
b/inlong-audit/audit-source/pom.xml
index 7f118b3..8782099 100644
--- a/inlong-audit/audit-source/pom.xml
+++ b/inlong-audit/audit-source/pom.xml
@@ -73,6 +73,11 @@
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <optional>true</optional>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/HighPriorityThreadFactory.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/HighPriorityThreadFactory.java
new file mode 100644
index 0000000..673b75c
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/HighPriorityThreadFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.base;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class HighPriorityThreadFactory
+        implements ThreadFactory {
+    private static final AtomicInteger poolNumber = new AtomicInteger(1);
+    private final AtomicInteger threadNumber;
+    private final ThreadGroup group;
+    private final String namePrefix;
+    private final boolean isDaemon;
+
+    public HighPriorityThreadFactory(String name) {
+        this(name, false);
+    }
+
+    public HighPriorityThreadFactory(String prefix, boolean daemon) {
+        this.threadNumber = new AtomicInteger(1);
+        SecurityManager s = System.getSecurityManager();
+        this.group = s != null ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
+        this.namePrefix = prefix + "-thread-" + poolNumber.getAndIncrement();
+        this.isDaemon = daemon;
+    }
+
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(this.group, r, this.namePrefix + 
this.threadNumber.getAndIncrement(), 0L);
+        t.setDaemon(this.isDaemon);
+        t.setPriority(10);
+        return t;
+    }
+}
+
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/NamedThreadFactory.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/NamedThreadFactory.java
new file mode 100644
index 0000000..821050a
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/NamedThreadFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.base;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NamedThreadFactory implements ThreadFactory {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(NamedThreadFactory.class);
+
+    private final AtomicInteger mThreadNum = new AtomicInteger(1);
+
+    private final String threadType;
+
+    public NamedThreadFactory(String threadType) {
+        this.threadType = threadType;
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(r, threadType + "-running-thread-" + 
mThreadNum.getAndIncrement());
+        LOGGER.debug("{} created", t.getName());
+        return t;
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/channel/FailoverChannelSelector.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/channel/FailoverChannelSelector.java
new file mode 100644
index 0000000..6343ebc
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/channel/FailoverChannelSelector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.channel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.channel.AbstractChannelSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverChannelSelector extends AbstractChannelSelector {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FailoverChannelSelector.class);
+
+    private static final String MASTER_CHANNEL = "master";
+
+    private int masterIndex = 0;
+
+    private int slaveIndex = 0;
+
+    private final List<Channel> masterChannels = new ArrayList<Channel>();
+    private final List<Channel> slaveChannels = new ArrayList<Channel>();
+
+    @Override
+    public List<Channel> getRequiredChannels(Event event) {
+        List<Channel> retChannels = new ArrayList<Channel>();
+        if (masterChannels.size() > 0) {
+            retChannels.add(masterChannels.get(masterIndex));
+            masterIndex = (masterIndex + 1) % masterChannels.size();
+        } else {
+            LOG.warn("masterChannels size is zero!");
+        }
+
+        return retChannels;
+    }
+
+    @Override
+    public List<Channel> getOptionalChannels(Event event) {
+        List<Channel> retChannels = new ArrayList<Channel>();
+        if (slaveChannels.size() > 0) {
+            retChannels.add(slaveChannels.get(slaveIndex));
+            slaveIndex = (slaveIndex + 1) % slaveChannels.size();
+        } else {
+            LOG.warn("slaveChannels size is zero!");
+        }
+        return retChannels;
+    }
+
+    /**
+     * split channel name into name list.
+     *
+     * @param channelName - channel name
+     * @return - name list
+     */
+    private List<String> splitChannelName(String channelName) {
+        List<String> fileMetricList = new ArrayList<String>();
+        if (StringUtils.isEmpty(channelName)) {
+            LOG.info("channel name is null!");
+        } else {
+            fileMetricList = Arrays.asList(channelName.split("\\s+"));
+        }
+        return fileMetricList;
+    }
+
+    @Override
+    public void configure(Context context) {
+        String masters = context.getString(MASTER_CHANNEL);
+        if (StringUtils.isEmpty(masters)) {
+            throw new FlumeException("master channel is null!");
+        }
+        List<String> masterList = splitChannelName(masters);
+
+        for (Map.Entry<String, Channel> entry : 
getChannelNameMap().entrySet()) {
+            String channelName = entry.getKey();
+            Channel channel = entry.getValue();
+            if (masterList.contains(channelName)) {
+                this.masterChannels.add(channel);
+            } else {
+                this.slaveChannels.add(channel);
+            }
+        }
+        LOG.info("masters:" + this.masterChannels);
+        LOG.info("slaves:" + this.slaveChannels);
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/node/Application.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/node/Application.java
new file mode 100644
index 0000000..d1cd269
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.node;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.flume.Channel;
+import org.apache.flume.Constants;
+import org.apache.flume.Context;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.MonitoringType;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.flume.node.PollingPropertiesFileConfigurationProvider;
+import org.apache.flume.node.PropertiesFileConfigurationProvider;
+import org.apache.flume.util.SSLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 
+ * Application
+ */
+public class Application {
+
+    private static final Logger logger = LoggerFactory
+            .getLogger(Application.class);
+
+    public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+    public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+
+    private final List<LifecycleAware> components;
+    private final LifecycleSupervisor supervisor;
+    private MaterializedConfiguration materializedConfiguration;
+    private MonitorService monitorServer;
+    private final ReentrantLock lifecycleLock = new ReentrantLock();
+
+    public Application() {
+        this(new ArrayList<LifecycleAware>(0));
+    }
+
+    public Application(List<LifecycleAware> components) {
+        this.components = components;
+        supervisor = new LifecycleSupervisor();
+    }
+
+    public void start() {
+        lifecycleLock.lock();
+        try {
+            for (LifecycleAware component : components) {
+
+                supervisor.supervise(component,
+                        new SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
+            }
+        } finally {
+            lifecycleLock.unlock();
+        }
+    }
+
+    @Subscribe
+    public void handleConfigurationEvent(MaterializedConfiguration conf) {
+        try {
+            lifecycleLock.lockInterruptibly();
+            stopAllComponents();
+            startAllComponents(conf);
+        } catch (InterruptedException e) {
+            logger.info("Interrupted while trying to handle configuration 
event");
+            return;
+        } finally {
+            // If interrupted while trying to lock, we don't own the lock, so 
must not attempt to unlock
+            if (lifecycleLock.isHeldByCurrentThread()) {
+                lifecycleLock.unlock();
+            }
+        }
+    }
+
+    public void stop() {
+        lifecycleLock.lock();
+        stopAllComponents();
+        try {
+            supervisor.stop();
+            if (monitorServer != null) {
+                monitorServer.stop();
+            }
+        } finally {
+            lifecycleLock.unlock();
+        }
+    }
+
+    private void stopAllComponents() {
+        if (this.materializedConfiguration != null) {
+            logger.info("Shutting down configuration: {}", 
this.materializedConfiguration);
+            for (Entry<String, SourceRunner> entry : 
this.materializedConfiguration
+                    .getSourceRunners().entrySet()) {
+                try {
+                    logger.info("Stopping Source " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    logger.error("Error while stopping {}", entry.getValue(), 
e);
+                }
+            }
+
+            for (Entry<String, SinkRunner> entry : 
this.materializedConfiguration.getSinkRunners()
+                    .entrySet()) {
+                try {
+                    logger.info("Stopping Sink " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    logger.error("Error while stopping {}", entry.getValue(), 
e);
+                }
+            }
+
+            for (Entry<String, Channel> entry : 
this.materializedConfiguration.getChannels()
+                    .entrySet()) {
+                try {
+                    logger.info("Stopping Channel " + entry.getKey());
+                    supervisor.unsupervise(entry.getValue());
+                } catch (Exception e) {
+                    logger.error("Error while stopping {}", entry.getValue(), 
e);
+                }
+            }
+        }
+        if (monitorServer != null) {
+            monitorServer.stop();
+        }
+    }
+
+    private void startAllComponents(MaterializedConfiguration 
materializedConfiguration) {
+        logger.info("Starting new configuration:{}", 
materializedConfiguration);
+
+        this.materializedConfiguration = materializedConfiguration;
+
+        for (Entry<String, Channel> entry : 
materializedConfiguration.getChannels().entrySet()) {
+            try {
+                logger.info("Starting Channel " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
+            } catch (Exception e) {
+                logger.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+
+        /*
+         * Wait for all channels to start.
+         */
+        for (Channel ch : materializedConfiguration.getChannels().values()) {
+            while (ch.getLifecycleState() != LifecycleState.START
+                    && !supervisor.isComponentInErrorState(ch)) {
+                try {
+                    logger.info("Waiting for channel: " + ch.getName()
+                            + " to start. Sleeping for 500 ms");
+                    Thread.sleep(500);
+                } catch (InterruptedException e) {
+                    logger.error("Interrupted while waiting for channel to 
start.", e);
+                    Throwables.propagate(e);
+                }
+            }
+        }
+
+        for (Entry<String, SinkRunner> entry : 
materializedConfiguration.getSinkRunners()
+                .entrySet()) {
+            try {
+                logger.info("Starting Sink " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
+            } catch (Exception e) {
+                logger.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+
+        for (Entry<String, SourceRunner> entry : 
materializedConfiguration.getSourceRunners()
+                .entrySet()) {
+            try {
+                logger.info("Starting Source " + entry.getKey());
+                supervisor.supervise(entry.getValue(),
+                        new SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
+            } catch (Exception e) {
+                logger.error("Error while starting {}", entry.getValue(), e);
+            }
+        }
+
+        this.loadMonitoring();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void loadMonitoring() {
+        Properties systemProps = System.getProperties();
+        Set<String> keys = systemProps.stringPropertyNames();
+        try {
+            if (keys.contains(CONF_MONITOR_CLASS)) {
+                String monitorType = 
systemProps.getProperty(CONF_MONITOR_CLASS);
+                Class<? extends MonitorService> klass;
+                try {
+                    // Is it a known type?
+                    klass = MonitoringType.valueOf(
+                            
monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
+                } catch (Exception e) {
+                    // Not a known type, use FQCN
+                    klass = (Class<? extends MonitorService>) 
Class.forName(monitorType);
+                }
+                this.monitorServer = 
klass.getDeclaredConstructor().newInstance();
+                Context context = new Context();
+                for (String key : keys) {
+                    if (key.startsWith(CONF_MONITOR_PREFIX)) {
+                        
context.put(key.substring(CONF_MONITOR_PREFIX.length()),
+                                systemProps.getProperty(key));
+                    }
+                }
+                monitorServer.configure(context);
+                monitorServer.start();
+            }
+        } catch (Exception e) {
+            logger.warn("Error starting monitoring. "
+                    + "Monitoring might not be available.", e);
+        }
+
+    }
+
+    /**
+     * main
+     * @param args
+     */
+    public static void main(String[] args) {
+
+        try {
+            SSLUtil.initGlobalSSLParameters();
+
+            Options options = new Options();
+
+            Option option = new Option("n", "name", true, "the name of this 
agent");
+            option.setRequired(true);
+            options.addOption(option);
+
+            option = new Option("f", "conf-file", true,
+                    "specify a config file (required if -z missing)");
+            option.setRequired(false);
+            options.addOption(option);
+
+            option = new Option(null, "no-reload-conf", false,
+                    "do not reload config file if changed");
+            options.addOption(option);
+
+            option = new Option("h", "help", false, "display help text");
+            options.addOption(option);
+
+            CommandLineParser parser = new GnuParser();
+            CommandLine commandLine = parser.parse(options, args);
+
+            if (commandLine.hasOption('h')) {
+                new HelpFormatter().printHelp("flume-ng agent", options, true);
+                return;
+            }
+
+            String agentName = commandLine.getOptionValue('n');
+            boolean reload = !commandLine.hasOption("no-reload-conf");
+
+            Application application;
+
+            File configurationFile = new File(commandLine.getOptionValue('f'));
+
+            // The following is to ensure that by default the agent will fail 
on startup
+            // if the file does not exist.
+            if (!configurationFile.exists()) {
+                // If command line invocation, then need to fail fast
+                if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) 
== null) {
+                    String path = configurationFile.getPath();
+                    try {
+                        path = configurationFile.getCanonicalPath();
+                    } catch (IOException ex) {
+                        logger.error("Failed to read canonical path for file: 
" + path,
+                                ex);
+                    }
+                    throw new ParseException(
+                            "The specified configuration file does not exist: 
" + path);
+                }
+            }
+            List<LifecycleAware> components = Lists.newArrayList();
+
+            if (reload) {
+                EventBus eventBus = new EventBus(agentName + "-event-bus");
+                PollingPropertiesFileConfigurationProvider 
configurationProvider;
+                configurationProvider = new 
PollingPropertiesFileConfigurationProvider(
+                        agentName, configurationFile, eventBus, 30);
+                components.add(configurationProvider);
+                application = new Application(components);
+                eventBus.register(application);
+            } else {
+                PropertiesFileConfigurationProvider configurationProvider;
+                configurationProvider = new 
PropertiesFileConfigurationProvider(
+                        agentName, configurationFile);
+                application = new Application();
+                
application.handleConfigurationEvent(configurationProvider.getConfiguration());
+            }
+
+            //start application
+            application.start();
+
+            final Application appReference = application;
+            Runtime.getRuntime().addShutdownHook(new 
Thread("agent-shutdown-hook") {
+
+                @Override
+                public void run() {
+                    appReference.stop();
+                }
+            });
+
+        } catch (Exception e) {
+            logger.error("A fatal error occurred while running. Exception 
follows.", e);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/EventStat.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/EventStat.java
new file mode 100644
index 0000000..31a7223
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/EventStat.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.flume.Event;
+
+@Getter
+@Setter
+public class EventStat {
+    private Event event;
+    private int myRetryCnt;
+
+    public EventStat(Event event) {
+        this.event = event;
+        this.myRetryCnt = 0;
+    }
+
+    public EventStat(Event event, int retryCnt) {
+        this.event = event;
+        this.myRetryCnt = retryCnt;
+    }
+
+    public void incRetryCnt() {
+        this.myRetryCnt++;
+    }
+
+    public boolean shouldDrop() {
+        return false;
+    }
+
+    public void reset() {
+        this.event = null;
+        this.myRetryCnt = 0;
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
new file mode 100644
index 0000000..cdc850c
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.sink.pulsar.CreatePulsarClientCallBack;
+import org.apache.inlong.audit.sink.pulsar.PulsarClientService;
+import org.apache.inlong.audit.sink.pulsar.SendMessageCallBack;
+import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import 
org.apache.pulsar.client.api.PulsarClientException.NotConnectedException;
+import 
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
+import 
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * pulsar sink
+ *
+ * send to one pulsar cluster
+ */
+public class PulsarSink extends AbstractSink implements Configurable, 
SendMessageCallBack,
+        CreatePulsarClientCallBack {
+    private static final Logger logger = 
LoggerFactory.getLogger(PulsarSink.class);
+
+    /*
+     * properties for header info
+     */
+    private static String TOPIC = "topic";
+
+    /*
+     * default value
+     */
+    private static int BAD_EVENT_QUEUE_SIZE = 10000;
+    private static int BATCH_SIZE = 10000;
+    private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
+
+    /*
+     * properties for stat
+     */
+    private static String LOG_EVERY_N_EVENTS = "log_every_n_events";
+
+    private static String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
+
+    private static final String SINK_THREAD_NUM = "thread_num";
+
+    /*
+     * for log
+     */
+    private Integer logEveryNEvents;
+
+    private long diskIORatePerSec;
+
+    private RateLimiter diskRateLimiter;
+
+    /*
+     * for stat
+     */
+    private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
+
+    private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
+
+    private long t1 = System.currentTimeMillis();
+
+    private long t2 = 0L;
+
+    private static AtomicLong totalPulsarSuccSendCnt = new AtomicLong(0);
+
+    private static AtomicLong totalPulsarSuccSendSize = new AtomicLong(0);
+    /*
+     * for control
+     */
+    private boolean overflow = false;
+
+    private LinkedBlockingQueue<EventStat> resendQueue;
+
+    private long logCounter = 0;
+
+    private final AtomicLong currentInFlightCount = new AtomicLong(0);
+
+    /*
+     * whether the SendTask thread can send data to pulsar
+     */
+    private volatile boolean canSend = false;
+
+    /*
+     * Control whether the SinkRunner thread can read data from the Channel
+     */
+    private volatile boolean canTake = false;
+
+
+    private static int EVENT_QUEUE_SIZE = 1000;
+
+    private int threadNum;
+
+
+    /*
+     * send thread pool
+     */
+    private Thread[] sinkThreadPool;
+    private LinkedBlockingQueue<Event> eventQueue;
+
+    private SinkCounter sinkCounter;
+
+    private PulsarClientService pulsarClientService;
+
+    private static final Long PRINT_INTERVAL = 30L;
+
+    private static final PulsarPerformanceTask pulsarPerformanceTask = new 
PulsarPerformanceTask();
+
+    private static ScheduledExecutorService scheduledExecutorService = 
Executors
+            .newScheduledThreadPool(1, new 
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
+
+    private String topic;
+
+    static {
+        /*
+         * stat pulsar performance
+         */
+        System.out.println("pulsarPerformanceTask!!!!!!");
+        scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask, 
0L,
+                PRINT_INTERVAL, TimeUnit.SECONDS);
+    }
+
+    public PulsarSink() {
+        super();
+        logger.debug("new instance of PulsarSink!");
+    }
+
+    /**
+     * configure
+     * @param context
+     */
+    public void configure(Context context) {
+        logger.info("PulsarSink started and context = {}", context.toString());
+        /*
+         * topic config
+         */
+        topic  = context.getString(TOPIC);
+        logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS, 
DEFAULT_LOG_EVERY_N_EVENTS);
+        logger.debug(this.getName() + " " + LOG_EVERY_N_EVENTS + " " + 
logEveryNEvents);
+        Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must 
be > 0");
+
+        resendQueue = new LinkedBlockingQueue<EventStat>(BAD_EVENT_QUEUE_SIZE);
+
+        String sinkThreadNum = context.getString(SINK_THREAD_NUM, "4");
+        threadNum = Integer.parseInt(sinkThreadNum);
+        Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
+        sinkThreadPool = new Thread[threadNum];
+        eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
+
+        diskIORatePerSec = context.getLong(DISK_IO_RATE_PER_SEC,0L);
+        if (diskIORatePerSec != 0) {
+            diskRateLimiter = RateLimiter.create(diskIORatePerSec);
+        }
+        pulsarClientService = new PulsarClientService(context);
+
+        if (sinkCounter == null) {
+            sinkCounter = new SinkCounter(getName());
+        }
+    }
+
+    private void initTopic() throws Exception {
+        long startTime = System.currentTimeMillis();
+        if (topic != null) {
+            pulsarClientService.initTopicProducer(topic);
+        }
+        logger.info(getName() + " initTopic cost: "
+                + (System.currentTimeMillis() - startTime) + "ms");
+    }
+
+    @Override
+    public void start() {
+        logger.info("pulsar sink starting...");
+        sinkCounter.start();
+        pulsarClientService.initCreateConnection(this);
+
+        super.start();
+        this.canSend = true;
+        this.canTake = true;
+        try {
+            initTopic();
+        } catch (Exception e) {
+            logger.info("meta sink start publish topic fail.",e);
+        }
+
+        for (int i = 0; i < sinkThreadPool.length; i++) {
+            sinkThreadPool[i] = new Thread(new SinkTask(), getName()
+                    + "_pulsar_sink_sender-"
+                    + i);
+            sinkThreadPool[i].start();
+        }
+        logger.debug("meta sink started");
+    }
+
+    @Override
+    public void stop() {
+        logger.info("pulsar sink stopping");
+        pulsarClientService.close();
+        this.canTake = false;
+        int waitCount = 0;
+        while (eventQueue.size() != 0 && waitCount++ < 10) {
+            try {
+                Thread.currentThread().sleep(1000);
+            } catch (InterruptedException e) {
+                logger.info("Stop thread has been interrupt!");
+                break;
+            }
+        }
+        this.canSend = false;
+
+        if (sinkThreadPool != null) {
+            for (Thread thread : sinkThreadPool) {
+                if (thread != null) {
+                    thread.interrupt();
+                }
+            }
+            sinkThreadPool = null;
+        }
+        super.stop();
+        if (!scheduledExecutorService.isShutdown()) {
+            scheduledExecutorService.shutdown();
+        }
+        sinkCounter.stop();
+        logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
+    }
+
+    @Override
+    public Status process() throws EventDeliveryException {
+        logger.debug("process......");
+        if (!this.canTake) {
+            return Status.BACKOFF;
+        }
+
+        Status status = Status.READY;
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        try {
+            Event event = channel.take();
+            if (event != null) {
+                if (diskRateLimiter != null) {
+                    diskRateLimiter.acquire(event.getBody().length);
+                }
+                if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) 
{
+                    logger.info("[{}] Channel --> Queue(has no enough 
space,current code point) "
+                            + "--> pulsar,Check if pulsar server or network is 
ok.(if this situation "
+                            + "last long time it will cause memoryChannel full 
and fileChannel write.)", getName());
+                    tx.rollback();
+                } else {
+                    tx.commit();
+                }
+            } else {
+                status = Status.BACKOFF;
+                tx.commit();
+            }
+        } catch (Throwable t) {
+            logger.error("Process event failed!" + this.getName(), t);
+            try {
+                tx.rollback();
+            } catch (Throwable e) {
+                logger.error("metasink transaction rollback exception",e);
+
+            }
+        } finally {
+            tx.close();
+        }
+        return status;
+    }
+
+    @Override
+    public void handleCreateClientSuccess(String url) {
+        logger.info("createConnection success for url = {}", url);
+        sinkCounter.incrementConnectionCreatedCount();
+    }
+
+    @Override
+    public void handleCreateClientException(String url) {
+        logger.error("createConnection has exception for url = {}", url);
+        sinkCounter.incrementConnectionFailedCount();
+    }
+
+    @Override
+    public void handleMessageSendSuccess(Object result,  EventStat eventStat) {
+        /*
+         * Statistics pulsar performance
+         */
+        totalPulsarSuccSendCnt.incrementAndGet();
+        
totalPulsarSuccSendSize.addAndGet(eventStat.getEvent().getBody().length);
+        /*
+         *add to sinkCounter
+         */
+        sinkCounter.incrementEventDrainSuccessCount();
+        currentInFlightCount.decrementAndGet();
+        currentSuccessSendCnt.incrementAndGet();
+        long nowCnt = currentSuccessSendCnt.get();
+        long oldCnt = lastSuccessSendCnt.get();
+        if (nowCnt % logEveryNEvents == 0 && nowCnt != 
lastSuccessSendCnt.get()) {
+            lastSuccessSendCnt.set(nowCnt);
+            t2 = System.currentTimeMillis();
+            logger.info("metasink {}, succ put {} events to pulsar,"
+                    + " in the past {} millsec", new Object[] {
+                    getName(), (nowCnt - oldCnt), (t2 - t1)
+            });
+            t1 = t2;
+        }
+    }
+
+    @Override
+    public void handleMessageSendException(EventStat eventStat,  Object e) {
+        if (e instanceof TooLongFrameException) {
+            PulsarSink.this.overflow = true;
+        } else if (e instanceof ProducerQueueIsFullError) {
+            PulsarSink.this.overflow = true;
+        } else if (!(e instanceof AlreadyClosedException
+                || e instanceof NotConnectedException
+                || e instanceof TopicTerminatedException)) {
+            logger.error("handle message send exception ,msg will resend 
later, e = {}", e);
+        }
+        eventStat.incRetryCnt();
+        resendEvent(eventStat, true);
+    }
+
+    /**
+     * Resend the data and store the data in the memory cache.
+     * @param es
+     * @param isDecrement
+     */
+    private void resendEvent(EventStat es, boolean isDecrement) {
+        try {
+            if (isDecrement) {
+                currentInFlightCount.decrementAndGet();
+            }
+            if (es == null || es.getEvent() == null) {
+                return;
+            }
+            if (!resendQueue.offer(es)) {
+                
FailoverChannelProcessorHolder.getChannelProcessor().processEvent(es.getEvent());
+            }
+        } catch (Throwable throwable) {
+            logger.error("resendEvent e = {}", throwable);
+        }
+    }
+
+    static class PulsarPerformanceTask implements Runnable {
+        @Override
+        public void run() {
+            try {
+                if (totalPulsarSuccSendSize.get() != 0) {
+                    logger.info("Total pulsar performance tps :"
+                            + totalPulsarSuccSendCnt.get() / PRINT_INTERVAL
+                            + "/s, avg msg size:"
+                            + totalPulsarSuccSendSize.get() / 
totalPulsarSuccSendCnt.get()
+                            + ",print every " + PRINT_INTERVAL + " seconds");
+                    /*
+                     * totalpulsarSuccSendCnt represents the number of packets
+                     */
+                    totalPulsarSuccSendCnt.set(0);
+                    totalPulsarSuccSendSize.set(0);
+                }
+
+            } catch (Exception e) {
+                logger.info("pulsarPerformanceTask error", e);
+            }
+        }
+    }
+
+    class SinkTask implements Runnable {
+        @Override
+        public void run() {
+            logger.info("Sink task {} started.", 
Thread.currentThread().getName());
+            while (canSend) {
+                logger.debug("SinkTask process......");
+                boolean decrementFlag = false;
+                Event event = null;
+                EventStat eventStat = null;
+                try {
+                    if (PulsarSink.this.overflow) {
+                        PulsarSink.this.overflow = false;
+                        Thread.currentThread().sleep(10);
+                    }
+                    if (!resendQueue.isEmpty()) {
+                        /*
+                         * Send the data in the retry queue first
+                         */
+                        eventStat = resendQueue.poll();
+                        if (eventStat != null) {
+                            event = eventStat.getEvent();
+                        }
+                    } else {
+                        if (currentInFlightCount.get() > BATCH_SIZE) {
+                            /*
+                             * Under the condition that the number of 
unresponsive messages
+                             * is greater than 1w, the number of unresponsive 
messages sent
+                             * to pulsar will be printed periodically
+                             */
+                            logCounter++;
+                            if (logCounter == 1 || logCounter % 100000 == 0) {
+                                logger.info(getName()
+                                                + " currentInFlightCount={} 
resendQueue"
+                                                + ".size={}",
+                                        
currentInFlightCount.get(),resendQueue.size());
+                            }
+                            if (logCounter > Long.MAX_VALUE - 10) {
+                                logCounter = 0;
+                            }
+                        }
+                        event = eventQueue.take();
+                        eventStat = new EventStat(event);
+                        sinkCounter.incrementEventDrainAttemptCount();
+                    }
+                    logger.debug("Event is {}, topic = {} ",event, topic);
+
+                    if (event == null) {
+                        continue;
+                    }
+
+                    if (topic == null || topic.equals("")) {
+                        logger.warn("no topic specified in event header, just 
skip this event");
+                        continue;
+                    }
+
+                    final EventStat es = eventStat;
+                    boolean sendResult = 
pulsarClientService.sendMessage(topic, event,
+                            PulsarSink.this, es);
+                    if (!sendResult) {
+                        continue;
+                    }
+                    currentInFlightCount.incrementAndGet();
+                    decrementFlag = true;
+                } catch (InterruptedException e) {
+                    logger.info("Thread {} has been interrupted!", 
Thread.currentThread().getName());
+                    return;
+                } catch (Throwable t) {
+                    if (t instanceof PulsarClientException) {
+                        String message = t.getMessage();
+                        if (message != null && (message.contains("No available 
queue for topic")
+                                || message.contains("The brokers of topic are 
all forbidden"))) {
+                            logger.info("IllegalTopicMap.put " + topic);
+                            continue;
+                        } else {
+                            try {
+                                Thread.sleep(100);
+                            } catch (InterruptedException e) {
+                                //ignore..
+                            }
+                        }
+                    }
+                    resendEvent(eventStat, decrementFlag);
+                }
+            }
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/CreatePulsarClientCallBack.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/CreatePulsarClientCallBack.java
new file mode 100644
index 0000000..40e6a8a
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/CreatePulsarClientCallBack.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink.pulsar;
+
+/**
+ * call back interface for create pulsar client
+ */
+public interface CreatePulsarClientCallBack {
+
+    /**
+     * call after create pulsar client success
+     * @param url
+     */
+    void handleCreateClientSuccess(String url);
+
+    /**
+     * call after create pulsar client has exception
+     * @param url
+     */
+    void handleCreateClientException(String url);
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
new file mode 100644
index 0000000..efbd821
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.inlong.audit.consts.AttributeConstants;
+import org.apache.inlong.audit.sink.EventStat;
+import org.apache.inlong.audit.utils.LogCounter;
+import org.apache.inlong.audit.utils.NetworkUtils;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarClientService {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(PulsarClientService.class);
+
+    private static final LogCounter logPrinterA = new LogCounter(10, 100000, 
60 * 1000);
+
+    /*
+     * properties key for pulsar client
+     */
+    private static String PULSAR_SERVER_URL = "pulsar_server_url";
+
+    /*
+     * properties key pulsar producer
+     */
+    private static String SEND_TIMEOUT = "send_timeout_ms";
+    private static String CLIENT_TIMEOUT = "client_op_timeout_second";
+    private static String ENABLE_BATCH = "enable_batch";
+    private static String BLOCK_IF_QUEUE_FULL = "block_if_queue_full";
+    private static String MAX_PENDING_MESSAGES = "max_pending_messages";
+    private static String MAX_BATCHING_MESSAGES = "max_batching_messages";
+
+    private static int DEFAULT_SEND_TIMEOUT_MILL = 30 * 1000;
+    private static int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
+    private static boolean DEFAULT_ENABLE_BATCH = true;
+    private static boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
+    private static int DEFAULT_MAX_PENDING_MESSAGES = 10000;
+    private static int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
+
+    /*
+     * for producer
+     */
+    private Integer sendTimeout; // in millsec
+    private Integer clientOpTimeout;
+    private boolean enableBatch = true;
+    private boolean blockIfQueueFull = true;
+    private int maxPendingMessages = 10000;
+    private int maxBatchingMessages = 1000;
+    public ConcurrentHashMap<String, Producer> producerInfoMap;
+    public PulsarClient pulsarClient;
+    public String pulsarServerUrl;
+
+    private String localIp = "127.0.0.1";
+
+    /**
+     * pulsar client service
+     * @param context
+     */
+    public PulsarClientService(Context context) {
+
+        pulsarServerUrl = context.getString(PULSAR_SERVER_URL);
+        Preconditions.checkState(pulsarServerUrl != null, "No pulsar server 
url specified");
+
+        sendTimeout = context.getInteger(SEND_TIMEOUT, 
DEFAULT_SEND_TIMEOUT_MILL);
+        clientOpTimeout = context.getInteger(CLIENT_TIMEOUT, 
DEFAULT_CLIENT_TIMEOUT_SECOND);
+        logger.debug("PulsarClientService " + SEND_TIMEOUT + " " + 
sendTimeout);
+        Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be > 
0");
+
+        enableBatch = context.getBoolean(ENABLE_BATCH, DEFAULT_ENABLE_BATCH);
+        blockIfQueueFull = context.getBoolean(BLOCK_IF_QUEUE_FULL, 
DEFAULT_BLOCK_IF_QUEUE_FULL);
+        maxPendingMessages = context.getInteger(MAX_PENDING_MESSAGES, 
DEFAULT_MAX_PENDING_MESSAGES);
+        maxBatchingMessages =  context.getInteger(MAX_BATCHING_MESSAGES, 
DEFAULT_MAX_BATCHING_MESSAGES);
+        producerInfoMap = new ConcurrentHashMap<>();
+        localIp = NetworkUtils.getLocalIp();
+
+    }
+
+    /**
+     * init connection
+     * @param callBack
+     */
+    public void initCreateConnection(CreatePulsarClientCallBack callBack) {
+        try {
+            createConnection(callBack);
+        } catch (FlumeException e) {
+            logger.error("Unable to create pulsar client" + ". Exception 
follows.", e);
+            close();
+        }
+    }
+
+    /**
+     * send message
+     * @param topic
+     * @param event
+     * @param sendMessageCallBack
+     * @param es
+     * @return
+     */
+    public boolean sendMessage(String topic, Event event,
+            SendMessageCallBack sendMessageCallBack, EventStat es) {
+        Producer producer = null;
+        try {
+            producer = getProducer(topic);
+        } catch (Exception e) {
+            if (logPrinterA.shouldPrint()) {
+                logger.error("Get producer failed!", e);
+            }
+        }
+
+        if (producer == null) {
+            logger.error("Get producer is null!");
+            return false;
+        }
+
+        Map<String, String> proMap = new HashMap<>();
+        proMap.put("auditIp", localIp);
+        String streamId = "";
+        String groupId = "";
+        if 
(event.getHeaders().containsKey(AttributeConstants.INLONG_STREAM_ID)) {
+            streamId = 
event.getHeaders().get(AttributeConstants.INLONG_STREAM_ID);
+            proMap.put(AttributeConstants.INLONG_STREAM_ID, streamId);
+        }
+        if 
(event.getHeaders().containsKey(AttributeConstants.INLONG_GROUP_ID)) {
+            groupId = 
event.getHeaders().get(AttributeConstants.INLONG_GROUP_ID);
+            proMap.put(AttributeConstants.INLONG_GROUP_ID, groupId);
+        }
+
+        logger.debug("producer send msg!");
+        producer.newMessage().properties(proMap).value(event.getBody())
+                .sendAsync().thenAccept((msgId) -> {
+            sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl)msgId, 
es);
+
+        }).exceptionally((e) -> {
+            sendMessageCallBack.handleMessageSendException(es, e);
+            return null;
+        });
+        return true;
+    }
+
+    /**
+     * If this function is called successively without calling {@see 
#destroyConnection()}, only the
+     * first call has any effect.
+     *
+     * @throws FlumeException if an RPC client connection could not be opened
+     */
+    private void createConnection(CreatePulsarClientCallBack callBack) throws 
FlumeException {
+        if (pulsarClient != null) {
+            return;
+        }
+        try {
+            pulsarClient = initPulsarClient(pulsarServerUrl);
+            callBack.handleCreateClientSuccess(pulsarServerUrl);
+        } catch (PulsarClientException e) {
+            callBack.handleCreateClientException(pulsarServerUrl);
+            logger.error("create connnection error in metasink, "
+                            + "maybe pulsar master set error, please 
re-check.url{}, ex1 {}",
+                    pulsarServerUrl,
+                    e.getMessage());
+        } catch (Throwable e) {
+            callBack.handleCreateClientException(pulsarServerUrl);
+            logger.error("create connnection error in metasink, "
+                            + "maybe pulsar master set error/shutdown in 
progress, please "
+                            + "re-check. url{}, ex2 {}",
+                    pulsarServerUrl,
+                    e.getMessage());
+        }
+    }
+
+    private PulsarClient initPulsarClient(String pulsarUrl) throws Exception {
+        return PulsarClient.builder()
+                .serviceUrl(pulsarUrl)
+                .connectionTimeout(clientOpTimeout, TimeUnit.SECONDS)
+                .build();
+    }
+
+    public Producer initTopicProducer(String topic) {
+        logger.info("initTopicProducer topic = {}", topic);
+        Producer producer = null;
+        try {
+            producer = pulsarClient.newProducer().sendTimeout(sendTimeout,
+                    TimeUnit.MILLISECONDS)
+                    .topic(topic)
+                    .enableBatching(enableBatch)
+                    .blockIfQueueFull(blockIfQueueFull)
+                    .maxPendingMessages(maxPendingMessages)
+                    .batchingMaxMessages(maxBatchingMessages)
+                    .create();
+        } catch (PulsarClientException e) {
+            logger.error("create pulsar client has error e = {}", e);
+        }
+        return producer;
+    }
+
+    private Producer getProducer(String topic) {
+        return producerInfoMap.computeIfAbsent(topic, (k) -> 
initTopicProducer(topic));
+    }
+
+    public void closeTopicProducer(String topic) {
+        logger.info("closeTopicProducer topic = {}", topic);
+        Producer producer  = producerInfoMap.remove(topic);
+        if (producer != null) {
+            producer.closeAsync();
+        }
+    }
+
+    private void destroyConnection() {
+        producerInfoMap.clear();
+        if (pulsarClient != null) {
+            try {
+                pulsarClient.shutdown();
+            } catch (PulsarClientException e) {
+                logger.error("destroy pulsarClient error in PulsarSink, 
PulsarClientException {}",
+                        e.getMessage());
+            } catch (Exception e) {
+                logger.error("destroy pulsarClient error in PulsarSink, ex 
{}", e.getMessage());
+            }
+        }
+        pulsarClient = null;
+        logger.debug("closed meta producer");
+    }
+
+    public void close() {
+        destroyConnection();
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/SendMessageCallBack.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/SendMessageCallBack.java
new file mode 100644
index 0000000..9fb64d5
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/SendMessageCallBack.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink.pulsar;
+
+import org.apache.inlong.audit.sink.EventStat;
+
+public interface SendMessageCallBack {
+
+    void handleMessageSendSuccess(Object msgId, EventStat es);
+
+    void handleMessageSendException(EventStat es, Object exception);
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/DefaultServiceDecoder.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/DefaultServiceDecoder.java
new file mode 100644
index 0000000..26c230f
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/DefaultServiceDecoder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import org.apache.inlong.audit.consts.ConfigConstants;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
+
+public class DefaultServiceDecoder implements ServiceDecoder {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(DefaultServiceDecoder.class);
+
+    @Override
+    public BaseCommand extractData(ChannelBuffer cb, Channel channel) throws 
Exception {
+
+        /*[cmd size] | [cmd]*/
+        if (null == cb) {
+            LOG.error("cb == null");
+            return null;
+        }
+        int totalLen = cb.readableBytes();
+        if (ConfigConstants.MSG_MAX_LENGTH_BYTES < totalLen) {
+            throw new Exception(new Throwable("err msg, 
ConfigConstants.MSG_MAX_LENGTH_BYTES "
+                    + "< totalLen, and  totalLen=" + totalLen));
+        }
+        cb.markReaderIndex();
+        BaseCommand cmd = null;
+        BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
+        int cmdSize = cb.readInt();
+        if (cmdSize <= totalLen) {
+            byte[] bodyBytes = new byte[cmdSize];
+            cb.readBytes(bodyBytes);
+            LOG.debug("msg totalDataLen = {}, cmdSize = {}", totalLen, 
cmdSize);
+            cmd = cmdBuilder.mergeFrom(bodyBytes).build();
+        } else {
+            // reset index.
+            cb.resetReaderIndex();
+        }
+        return cmd;
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageFactory.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageFactory.java
new file mode 100644
index 0000000..e254c6e
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.TimeUnit;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.AbstractSource;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServerMessageFactory implements ChannelPipelineFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ServerMessageFactory.class);
+    private static final int DEFAULT_READ_IDLE_TIME = 70 * 60 * 1000;
+    private AbstractSource source;
+    private ChannelProcessor processor;
+    private ChannelGroup allChannels;
+    private ServiceDecoder serviceDecoder;
+    private String messageHandlerName;
+    private int maxConnections = Integer.MAX_VALUE;
+    private int maxMsgLength;
+    private String name;
+    private Timer timer = new HashedWheelTimer();
+
+    /**
+     * get server factory
+     *
+     * @param source
+     * @param allChannels
+     * @param serviceDecoder
+     * @param messageHandlerName
+     * @param maxMsgLength
+     * @param maxCons
+     * @param name
+     */
+    public ServerMessageFactory(AbstractSource source,
+            ChannelGroup allChannels, ServiceDecoder serviceDecoder,
+            String messageHandlerName, Integer maxMsgLength, Integer maxCons, 
String name) {
+        this.source = source;
+        this.processor = source.getChannelProcessor();
+        this.allChannels = allChannels;
+        this.serviceDecoder = serviceDecoder;
+        this.messageHandlerName = messageHandlerName;
+        this.name = name;
+        this.maxConnections = maxCons;
+        this.maxMsgLength = maxMsgLength;
+    }
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline cp = Channels.pipeline();
+        return addMessageHandlersTo(cp);
+    }
+
+    /**
+     * get message handlers
+     * @param cp
+     * @return
+     */
+    public ChannelPipeline addMessageHandlersTo(ChannelPipeline cp) {
+
+        cp.addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
+                this.maxMsgLength, 0, 4, 0, 0, true));
+        cp.addLast("readTimeoutHandler", new ReadTimeoutHandler(timer,
+                DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
+
+        if (processor != null) {
+            try {
+                Class<? extends SimpleChannelHandler> clazz = (Class<? extends 
SimpleChannelHandler>) Class
+                        .forName(messageHandlerName);
+
+                Constructor<?> ctor = clazz.getConstructor(
+                        AbstractSource.class, ServiceDecoder.class, 
ChannelGroup.class, Integer.class);
+
+                SimpleChannelHandler messageHandler = (SimpleChannelHandler) 
ctor
+                        .newInstance(source, serviceDecoder, allChannels, 
maxConnections);
+
+                cp.addLast("messageHandler", messageHandler);
+            } catch (Exception e) {
+                LOG.info("SimpleChannelHandler.newInstance  has error:" + 
name, e);
+            }
+        }
+        return cp;
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
new file mode 100644
index 0000000..77e3d9a
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.SocketAddress;
+import java.util.List;
+import org.apache.flume.Event;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+
+import org.apache.inlong.audit.protocol.AuditApi.AuditMessageBody;
+import org.apache.inlong.audit.protocol.AuditApi.AuditReply;
+import org.apache.inlong.audit.protocol.AuditApi.AuditReply.RSP_CODE;
+import org.apache.inlong.audit.protocol.AuditApi.AuditRequest;
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
+import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.inlong.audit.protocol.Commands;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Server message handler
+ *
+ */
+public class ServerMessageHandler extends SimpleChannelHandler {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(ServerMessageHandler.class);
+
+    private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
+
+    private AbstractSource source;
+    private final ChannelGroup allChannels;
+    private int maxConnections = Integer.MAX_VALUE;
+
+    private final ChannelProcessor processor;
+    private final ServiceDecoder serviceDecoder;
+
+    private final Gson gson = new Gson();
+
+    public ServerMessageHandler(AbstractSource source, ServiceDecoder 
serviceDecoder,
+                                ChannelGroup allChannels, Integer maxCons) {
+        this.source = source;
+        this.processor = source.getChannelProcessor();
+        this.serviceDecoder = serviceDecoder;
+        this.allChannels = allChannels;
+        this.maxConnections = maxCons;
+
+    }
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) 
throws Exception {
+        if (allChannels.size() - 1 >= maxConnections) {
+            logger.warn("refuse to connect , and connections=" + 
(allChannels.size() - 1)
+                    + ", maxConnections="
+                    + maxConnections + ",channel is " + e.getChannel());
+            e.getChannel().disconnect();
+            e.getChannel().close();
+        }
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 
throws Exception {
+        logger.debug("message received");
+        if (e == null) {
+            logger.warn("get null message event, just skip");
+            return;
+        }
+        ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
+        SocketAddress remoteSocketAddress = e.getRemoteAddress();
+        int len = cb.readableBytes();
+        if (len == 0) {
+            logger.warn("receive message skip empty msg.");
+            cb.clear();
+            return;
+        }
+        Channel remoteChannel = e.getChannel();
+        BaseCommand cmd = null;
+        try {
+            cmd = serviceDecoder.extractData(cb, remoteChannel);
+        } catch (Exception ex) {
+            logger.error("extractData has error e {}", ex);
+            throw new IOException(ex.getCause());
+        }
+
+        if (cmd == null) {
+            logger.warn("receive message extractData is null");
+            return;
+        }
+        ChannelBuffer channelBuffer = null;
+        switch (cmd.getType()) {
+            case PING:
+                checkArgument(cmd.hasPing());
+                channelBuffer  = Commands.getPongChannelBuffer();
+                break;
+            case PONG:
+                checkArgument(cmd.hasPong());
+                channelBuffer  = Commands.getPingChannelBuffer();
+                break;
+            case AUDITREQUEST:
+                checkArgument(cmd.hasAuditRequest());
+                AuditReply auditReply = handleRequest(cmd.getAuditRequest());
+                channelBuffer  = Commands.getAuditReplylBuffer(auditReply);
+                break;
+            case AUDITREPLY:
+                checkArgument(cmd.hasAuditReply());
+                break;
+        }
+        if (channelBuffer != null) {
+            writeResponse(remoteChannel, remoteSocketAddress, channelBuffer);
+        }
+    }
+
+    private AuditReply handleRequest(AuditRequest auditRequest) {
+        AuditReply reply = null;
+        if (auditRequest != null) {
+            List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList();
+            if (bodyList != null) {
+                int errorMsgBody = 0;
+                for (AuditMessageBody auditMessageBody : bodyList) {
+                    AuditData auditData = new AuditData();
+                    auditData.setIp(auditRequest.getMsgHeader().getIp());
+                    
auditData.setThreadId(auditRequest.getMsgHeader().getThreadId());
+                    
auditData.setDockerId(auditRequest.getMsgHeader().getDockerId());
+                    
auditData.setPacketId(auditRequest.getMsgHeader().getPacketId());
+                    auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs());
+
+                    auditData.setLogTs(auditMessageBody.getLogTs());
+                    auditData.setAuditId(auditMessageBody.getAuditId());
+                    auditData.setCount(auditMessageBody.getCount());
+                    auditData.setDelay(auditMessageBody.getDelay());
+                    
auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
+                    
auditData.setInlongStreamId(auditMessageBody.getInlongStreamId());
+                    auditData.setSize(auditMessageBody.getSize());
+
+                    byte[] body = null;
+                    try {
+                        body = gson.toJson(auditData).getBytes("UTF-8");
+                    } catch (UnsupportedEncodingException e) {
+                        logger.error("UnsupportedEncodingException = {}", e);
+                    }
+                    if (body != null) {
+                        Event event = null;
+                        try {
+                            event = EventBuilder.withBody(body, null);
+                            processor.processEvent(event);
+                        } catch (Throwable ex) {
+                            logger.error("Error writing to controller,data 
will discard.", ex);
+                            errorMsgBody++;
+                        }
+                    }
+                }
+                if (errorMsgBody != 0) {
+                    reply = AuditReply.newBuilder().setMessage("Error writing 
to controller,data "
+                            + "will discard. error body num = "
+                            + 
errorMsgBody).setRspCode(RSP_CODE.FAILED).build();
+                }
+            }
+        }
+        if (reply == null) {
+            reply = 
AuditReply.newBuilder().setRspCode(RSP_CODE.SUCCESS).build();
+        }
+        return reply;
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 
throws Exception {
+        logger.error("exception caught", e.getCause());
+    }
+
+    @Override
+    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) 
throws Exception {
+        logger.error("channel closed {}", ctx.getChannel());
+    }
+
+    private void writeResponse(Channel remoteChannel,
+            SocketAddress remoteSocketAddress, ChannelBuffer buffer) throws 
Exception {
+        if (remoteChannel.isWritable()) {
+            remoteChannel.write(buffer, remoteSocketAddress);
+        } else {
+            logger.warn(
+                    "the send buffer2 is full, so disconnect it!please check 
remote client"
+                            + "; Connection info:" + remoteChannel);
+            throw new Exception(new Throwable(
+                    "the send buffer2 is full,so disconnect it!please check 
remote client, Connection info:"
+                            + remoteChannel));
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServiceDecoder.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServiceDecoder.java
new file mode 100644
index 0000000..f42ae30
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServiceDecoder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * decoder interface definition
+ *
+ */
+public interface ServiceDecoder {
+
+    /**
+     * extract data from buffer and convert it into map.
+     * @param cb
+     * @param channel
+     * @return
+     */
+    BaseCommand extractData(ChannelBuffer cb, Channel channel) throws 
Exception;
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/SimpleTcpSource.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/SimpleTcpSource.java
new file mode 100644
index 0000000..fc0dcd3
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/SimpleTcpSource.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.audit.base.NamedThreadFactory;
+import org.apache.inlong.audit.consts.ConfigConstants;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple tcp source
+ *
+ */
+public class SimpleTcpSource extends AbstractSource implements Configurable, 
EventDrivenSource {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(SimpleTcpSource.class);
+    private static final String CONNECTIONS = "connections";
+
+    protected int maxConnections = Integer.MAX_VALUE;
+    private ServerBootstrap serverBootstrap = null;
+    protected ChannelGroup allChannels;
+    protected int port;
+    protected String host = null;
+    protected String msgFactoryName;
+    protected String serviceDecoderName;
+    protected String messageHandlerName;
+    protected int maxMsgLength;
+    private int maxThreads = 32;
+
+    private boolean tcpNoDelay = true;
+    private boolean keepAlive = true;
+    private int receiveBufferSize;
+    private int highWaterMark;
+    private int sendBufferSize;
+
+    private static String HOST_DEFAULT_VALUE = "0.0.0.0";
+
+    private static int DEFAULT_MAX_THREADS = 32;
+
+    private static int DEFAULT_MAX_CONNECTIONS = 5000;
+
+    private static int MIN_MSG_LENGTH = 4;
+
+    private static int MAX_MSG_LENGTH = 1024 * 64;
+
+    private static int BUFFER_SIZE_MUST_THAN = 0;
+
+    private static int HIGH_WATER_MARK_DEFAULT_VALUE = 64 * 1024;
+
+    private static int RECEIVE_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+    private static int SEND_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+    private static int RECEIVE_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+    private static int SEND_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+    private Channel nettyChannel = null;
+
+    public SimpleTcpSource() {
+        super();
+        allChannels = new DefaultChannelGroup();
+    }
+
+    @Override
+    public synchronized void start() {
+        logger.info("start " + this.getName());
+        super.start();
+
+        
ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+        ChannelFactory factory = new NioServerSocketChannelFactory(Executors
+                .newCachedThreadPool(
+                        new 
NamedThreadFactory("tcpSource-nettyBoss-threadGroup")),
+                1,
+                Executors.newCachedThreadPool(
+                        new 
NamedThreadFactory("tcpSource-nettyWorker-threadGroup")),
+                maxThreads);
+        logger.info("Set max workers : {} ;", maxThreads);
+        ChannelPipelineFactory fac = null;
+
+        serverBootstrap = new ServerBootstrap(factory);
+        serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
+        serverBootstrap.setOption("child.keepAlive", keepAlive);
+        serverBootstrap.setOption("child.receiveBufferSize", 
receiveBufferSize);
+        serverBootstrap.setOption("child.sendBufferSize", sendBufferSize);
+        serverBootstrap.setOption("child.writeBufferHighWaterMark", 
highWaterMark);
+        logger.info("load msgFactory=" + msgFactoryName + " and 
serviceDecoderName="
+                + serviceDecoderName);
+        try {
+
+            ServiceDecoder serviceDecoder =
+                    (ServiceDecoder) 
Class.forName(serviceDecoderName).newInstance();
+
+            Class<? extends ChannelPipelineFactory> clazz =
+                    (Class<? extends ChannelPipelineFactory>) 
Class.forName(msgFactoryName);
+
+            Constructor ctor =
+                    clazz.getConstructor(AbstractSource.class, 
ChannelGroup.class,
+                            ServiceDecoder.class, String.class,
+                            Integer.class, Integer.class, String.class);
+
+            logger.info("Using channel processor:{}", 
this.getClass().getName());
+            fac = (ChannelPipelineFactory) ctor
+                    .newInstance(this, allChannels, serviceDecoder,
+                            messageHandlerName, maxMsgLength, maxConnections, 
this.getName());
+
+        } catch (Exception e) {
+            logger.error(
+                    "Simple Tcp Source start error, fail to construct 
ChannelPipelineFactory with name {}, ex {}",
+                    msgFactoryName, e);
+            stop();
+            throw new FlumeException(e.getMessage());
+        }
+
+        serverBootstrap.setPipelineFactory(fac);
+
+        try {
+            if (host == null) {
+                nettyChannel = serverBootstrap.bind(new 
InetSocketAddress(port));
+            } else {
+                nettyChannel = serverBootstrap.bind(new 
InetSocketAddress(host, port));
+            }
+        } catch (Exception e) {
+            logger.error("Simple TCP Source error bind host {} port {},program 
will exit!", host,
+                    port);
+            System.exit(-1);
+        }
+
+        allChannels.add(nettyChannel);
+
+        logger.info("Simple TCP Source started at host {}, port {}", host, 
port);
+
+    }
+
+    @Override
+    public synchronized void stop() {
+        logger.info("[STOP SOURCE]{} stopping...", super.getName());
+        if (allChannels != null && !allChannels.isEmpty()) {
+            try {
+                allChannels.unbind().awaitUninterruptibly();
+                allChannels.close().awaitUninterruptibly();
+            } catch (Exception e) {
+                logger.warn("Simple TCP Source netty server stop ex", e);
+            } finally {
+                allChannels.clear();
+                // allChannels = null;
+            }
+        }
+
+        if (serverBootstrap != null) {
+            try {
+
+                serverBootstrap.releaseExternalResources();
+            } catch (Exception e) {
+                logger.warn("Simple TCP Source serverBootstrap stop ex ", e);
+            } finally {
+                serverBootstrap = null;
+            }
+        }
+
+        super.stop();
+        logger.info("[STOP SOURCE]{} stopped", super.getName());
+    }
+
+    @Override
+    public void configure(Context context) {
+        logger.info("context is {}", context);
+        port = context.getInteger(ConfigConstants.CONFIG_PORT);
+        host = context.getString(ConfigConstants.CONFIG_HOST, 
HOST_DEFAULT_VALUE);
+
+        tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true);
+
+        keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true);
+        highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK,
+                HIGH_WATER_MARK_DEFAULT_VALUE);
+        receiveBufferSize = 
context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE,
+                RECEIVE_BUFFER_DEFAULT_SIZE);
+        if (receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
+            receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
+        }
+        Preconditions.checkArgument(receiveBufferSize > BUFFER_SIZE_MUST_THAN,
+                "receiveBufferSize must be > 0");
+
+        sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, 
SEND_BUFFER_DEFAULT_SIZE);
+        if (sendBufferSize > SEND_BUFFER_MAX_SIZE) {
+            sendBufferSize = SEND_BUFFER_MAX_SIZE;
+        }
+        Preconditions.checkArgument(sendBufferSize > BUFFER_SIZE_MUST_THAN,
+                "sendBufferSize must be > 0");
+
+        try {
+            maxThreads = context.getInteger(ConfigConstants.MAX_THREADS, 
DEFAULT_MAX_THREADS);
+        } catch (NumberFormatException e) {
+            logger.warn("Simple TCP Source max-threads property must specify 
an integer value. {}",
+                    context.getString(ConfigConstants.MAX_THREADS));
+        }
+
+        try {
+            maxConnections = context.getInteger(CONNECTIONS, 
DEFAULT_MAX_CONNECTIONS);
+        } catch (NumberFormatException e) {
+            logger.warn("BaseSource\'s \"connections\" property must specify 
an integer value.",
+                    context.getString(CONNECTIONS));
+        }
+
+        msgFactoryName = context.getString(ConfigConstants.MSG_FACTORY_NAME,
+                "org.apache.inlong.audit.source.ServerMessageFactory");
+        msgFactoryName = msgFactoryName.trim();
+        Preconditions
+                .checkArgument(StringUtils.isNotBlank(msgFactoryName), 
"msgFactoryName is empty");
+
+        serviceDecoderName = 
context.getString(ConfigConstants.SERVICE_PROCESSOR_NAME,
+                "org.apache.inlong.audit.source.DefaultServiceDecoder");
+        serviceDecoderName = serviceDecoderName.trim();
+        Preconditions.checkArgument(StringUtils.isNotBlank(serviceDecoderName),
+                "serviceProcessorName is empty");
+
+        messageHandlerName = 
context.getString(ConfigConstants.MESSAGE_HANDLER_NAME,
+                "org.apache.inlong.audit.source.ServerMessageHandler");
+        messageHandlerName = messageHandlerName.trim();
+        Preconditions.checkArgument(StringUtils.isNotBlank(messageHandlerName),
+                "messageHandlerName is empty");
+
+        maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, 
MAX_MSG_LENGTH);
+        Preconditions.checkArgument(
+                (maxMsgLength >= MIN_MSG_LENGTH && maxMsgLength <= 
ConfigConstants.MSG_MAX_LENGTH_BYTES),
+                "maxMsgLength must be >= 4 and <= " + 
ConfigConstants.MSG_MAX_LENGTH_BYTES);
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/FailoverChannelProcessorHolder.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/FailoverChannelProcessorHolder.java
new file mode 100644
index 0000000..6b10477
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/FailoverChannelProcessorHolder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.apache.flume.channel.ChannelProcessor;
+
+public class FailoverChannelProcessorHolder {
+    private static ChannelProcessor channelProcessor;
+
+    public static ChannelProcessor getChannelProcessor() {
+        return channelProcessor;
+    }
+
+    public static void setChannelProcessor(ChannelProcessor cp) {
+        channelProcessor = cp;
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/LogCounter.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/LogCounter.java
new file mode 100644
index 0000000..7257bb7
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/LogCounter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LogCounter {
+
+    private AtomicInteger counter = new AtomicInteger(0);
+
+    private int start = 10;
+    private int control = 1000;
+    private int reset = 60 * 1000;
+
+    private long lastLogTime = System.currentTimeMillis();
+
+    public LogCounter(int start, int control, int reset) {
+        this.start = start;
+        this.control = control;
+        this.reset = reset;
+    }
+
+    public boolean shouldPrint() {
+        long currentMills = Instant.now().toEpochMilli();
+        if (currentMills - lastLogTime > reset) {
+            counter.set(0);
+            this.lastLogTime = currentMills;
+        }
+
+        if (counter.incrementAndGet() > start && counter.get() % control != 0) 
{
+            return false;
+        }
+
+        return true;
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/NetworkUtils.java
 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/NetworkUtils.java
new file mode 100644
index 0000000..98e5d69
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/NetworkUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+public class NetworkUtils {
+    public static String INNER_NETWORK_INTERFACE = "eth1";
+
+    private static String localIp = null;
+
+    static {
+        localIp = getLocalIp();
+    }
+
+    /**
+     * get local ip
+     * @return
+     */
+    public static String getLocalIp() {
+        if (localIp == null) {
+            String ip = null;
+            Enumeration<NetworkInterface> allInterface;
+            try {
+                allInterface = NetworkInterface.getNetworkInterfaces();
+                for (; allInterface.hasMoreElements(); ) {
+                    NetworkInterface oneInterface = allInterface.nextElement();
+                    String interfaceName = oneInterface.getName();
+                    if (oneInterface.isLoopback()
+                            || !oneInterface.isUp()
+                            || !interfaceName
+                            .equalsIgnoreCase(INNER_NETWORK_INTERFACE)) {
+                        continue;
+                    }
+
+                    Enumeration<InetAddress> allAddress = oneInterface
+                            .getInetAddresses();
+                    for (; allAddress.hasMoreElements(); ) {
+                        InetAddress oneAddress = allAddress.nextElement();
+                        ip = oneAddress.getHostAddress();
+                        if (ip == null || ip.isEmpty() || 
ip.equals("127.0.0.1")) {
+                            continue;
+                        }
+                        return ip;
+                    }
+                }
+            } catch (SocketException e1) {
+                // ignore it
+            }
+
+            ip = "0.0.0.0";
+            localIp = ip;
+        }
+
+        return localIp;
+    }
+}
diff --git 
a/inlong-audit/audit-source/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
 
b/inlong-audit/audit-source/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
new file mode 100644
index 0000000..f492572
--- /dev/null
+++ 
b/inlong-audit/audit-source/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Charsets;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class PulsarSinkTest {
+    private static final Logger logger = LoggerFactory
+            .getLogger(PulsarSinkTest.class);
+    private static final String hostname = "127.0.0.1";
+    private static final Integer port = 1234;
+    private String zkStr = "127.0.0.1:2181";
+    private String zkRoot = "/meta";
+    private int batchSize = 1;
+
+    private PulsarSink sink;
+    private Channel channel;
+
+    @BeforeClass
+    public void setUp() {
+        sink = new PulsarSink();
+        channel = new MemoryChannel();
+
+        Context context = new Context();
+
+        context.put("type", "org.apache.inlong.dataproxy.sink.PulsarSink");
+        context.put("pulsar_server_url", "pulsar://127.0.0.1:6650");
+
+        sink.setChannel(channel);
+
+        Configurables.configure(sink, context);
+        Configurables.configure(channel, context);
+    }
+
+    @Test
+    public void testProcess() throws InterruptedException, 
EventDeliveryException,
+            InstantiationException, IllegalAccessException {
+        setUp();
+        Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+        sink.start();
+        Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+                LifecycleState.START_OR_ERROR, 5000));
+
+        Transaction transaction = channel.getTransaction();
+
+        transaction.begin();
+        for (int i = 0; i < 10; i++) {
+            channel.put(event);
+        }
+        transaction.commit();
+        transaction.close();
+
+        for (int i = 0; i < 5; i++) {
+            Sink.Status status = sink.process();
+            Assert.assertEquals(Sink.Status.READY, status);
+        }
+
+        sink.stop();
+        Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+                LifecycleState.STOP_OR_ERROR, 5000));
+    }
+
+}

Reply via email to