This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f167402 [INLONG][feature][audit] add audit-source module (#1905)
f167402 is described below
commit f167402db3be512a6bedb5cad6c6aeb1f9e8f597
Author: baomingyu <[email protected]>
AuthorDate: Tue Dec 7 09:44:23 2021 +0800
[INLONG][feature][audit] add audit-source module (#1905)
---
inlong-audit/audit-source/conf/audit.conf | 90 ++++
inlong-audit/audit-source/conf/pulsar.properties | 19 +
inlong-audit/audit-source/pom.xml | 5 +
.../audit/base/HighPriorityThreadFactory.java | 50 +++
.../inlong/audit/base/NamedThreadFactory.java | 44 ++
.../audit/channel/FailoverChannelSelector.java | 108 +++++
.../org/apache/inlong/audit/node/Application.java | 349 +++++++++++++++
.../org/apache/inlong/audit/sink/EventStat.java | 52 +++
.../org/apache/inlong/audit/sink/PulsarSink.java | 486 +++++++++++++++++++++
.../sink/pulsar/CreatePulsarClientCallBack.java | 36 ++
.../audit/sink/pulsar/PulsarClientService.java | 252 +++++++++++
.../audit/sink/pulsar/SendMessageCallBack.java | 27 ++
.../inlong/audit/source/DefaultServiceDecoder.java | 60 +++
.../inlong/audit/source/ServerMessageFactory.java | 110 +++++
.../inlong/audit/source/ServerMessageHandler.java | 217 +++++++++
.../apache/inlong/audit/source/ServiceDecoder.java | 37 ++
.../inlong/audit/source/SimpleTcpSource.java | 262 +++++++++++
.../utils/FailoverChannelProcessorHolder.java | 32 ++
.../org/apache/inlong/audit/utils/LogCounter.java | 52 +++
.../apache/inlong/audit/utils/NetworkUtils.java | 75 ++++
.../apache/inlong/audit/sink/PulsarSinkTest.java | 94 ++++
21 files changed, 2457 insertions(+)
diff --git a/inlong-audit/audit-source/conf/audit.conf
b/inlong-audit/audit-source/conf/audit.conf
new file mode 100644
index 0000000..eec6d76
--- /dev/null
+++ b/inlong-audit/audit-source/conf/audit.conf
@@ -0,0 +1,90 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# The configuration file needs to define the sources,
+# the channels and the sinks.
+# Sources, channels and sinks are defined per agent,
+# in this case called 'agent'
+
+agent1.sources = tcp-source
+agent1.channels = ch-msg1 ch-msg2
+agent1.sinks = pulsar-sink-msg1 pulsar-sink-msg2
+
+agent1.sources.tcp-source.channels = ch-msg1 ch-msg2
+agent1.sources.tcp-source.type = org.apache.flume.source.SimpleTcpSource
+agent1.sources.tcp-source.msg-factory-name =
org.apache.flume.source.ServerMessageFactory
+agent1.sources.tcp-source.host = 127.0.0.1
+agent1.sources.tcp-source.port = 46801
+agent1.sources.tcp-source.max-msg-length = 524288
+agent1.sources.tcp-source.connections = 30000
+agent1.sources.tcp-source.max-threads = 64
+agent1.sources.tcp-source.receiveBufferSize = 1048576
+agent1.sources.tcp-source.sendBufferSize = 1048576
+agent1.sources.tcp-source.custom-cp = true
+agent1.sources.tcp-source.selector.type =
org.apache.flume.channel.FailoverChannelSelector
+agent1.sources.tcp-source.selector.master = ch-msg1 ch-msg2
+agent1.sources.tcp-source.metric-recovery-path=/data/tdbank/audit/flume/recovery
+agent1.sources.tcp-source.metric-agent-port=8003
+agent1.sources.tcp-source.metric-cache-size=1000000
+agent1.sources.tcp-source.set=10
+
+agent1.channels.ch-msg1.type = memory
+agent1.channels.ch-msg1.capacity = 10000
+agent1.channels.ch-msg1.keep-alive = 0
+agent1.channels.ch-msg1.transactionCapacity = 200
+
+agent1.channels.ch-msg2.type = file
+agent1.channels.ch-msg2.capacity = 100000000
+agent1.channels.ch-msg2.maxFileSize = 1073741824
+agent1.channels.ch-msg2.minimumRequiredSpace = 1073741824
+agent1.channels.ch-msg2.checkpointDir = /data/tdbank/audit/file/ch-msg5/check
+agent1.channels.ch-msg2.dataDirs = /data/tdbank/audit/file/ch-msg5/data
+agent1.channels.ch-msg2.fsyncPerTransaction = false
+agent1.channels.ch-msg2.fsyncInterval = 10
+
+agent1.sinks.pulsar-sink-msg1.channel = ch-msg1
+agent1.sinks.pulsar-sink-msg1.type = org.apache.flume.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg1.pulsar_server_url=
pulsar://127.0.0.1:6650,127.0.0.2:6650
+agent1.sinks.pulsar-sink-msg1.topic = persistent://public/default/audit-test
+agent1.sinks.pulsar-sink-msg1.send_timeout_ms = 30000
+agent1.sinks.pulsar-sink-msg1.client_op_timeout_second = 30000
+agent1.sinks.pulsar-sink-msg1.stat_interval_sec = 60
+agent1.sinks.pulsar-sink-msg1.enable_batch = true
+agent1.sinks.pulsar-sink-msg1.block_if_queue_full = true
+agent1.sinks.pulsar-sink-msg1.max_pending_messages = 10000
+agent1.sinks.pulsar-sink-msg1.max_batching_messages = 1000
+agent1.sinks.pulsar-sink-msg1.retry_interval_when_send_error_ms = 30000
+agent1.sinks.pulsar-sink-msg1.thread_num = 8
+agent1.sinks.pulsar-sink-msg1.log_every_n_events = 100000
+agent1.sinks.pulsar-sink-msg1.disk_io_rate_per_sec= 20000000
+
+agent1.sinks.pulsar-sink-msg2.channel = ch-msg2
+agent1.sinks.pulsar-sink-msg2.type = org.apache.flume.sink.PulsarSink
+agent1.sinks.pulsar-sink-msg2.pulsar_server_url =
pulsar://127.0.0.1:6650,127.0.0.2:6650
+agent1.sinks.pulsar-sink-msg1.topic = persistent://public/default/audit-test
+agent1.sinks.pulsar-sink-msg2.send_timeout_ms = 30000
+agent1.sinks.pulsar-sink-msg2.client_op_timeout_second = 30000
+agent1.sinks.pulsar-sink-msg2.stat_interval_sec = 60
+agent1.sinks.pulsar-sink-msg2.enable_batch = true
+agent1.sinks.pulsar-sink-msg2.block_if_queue_full = true
+agent1.sinks.pulsar-sink-msg2.max_pending_messages = 10000
+agent1.sinks.pulsar-sink-msg2.max_batching_messages = 1000
+agent1.sinks.pulsar-sink-msg2.retry_interval_when_send_error_ms = 30000
+agent1.sinks.pulsar-sink-msg2.thread_num = 8
+agent1.sinks.pulsar-sink-msg2.log_every_n_events = 100000
+agent1.sinks.pulsar-sink-msg2.disk_io_rate_per_sec= 20000000
\ No newline at end of file
diff --git a/inlong-audit/audit-source/conf/pulsar.properties
b/inlong-audit/audit-source/conf/pulsar.properties
new file mode 100644
index 0000000..5cd8d02
--- /dev/null
+++ b/inlong-audit/audit-source/conf/pulsar.properties
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+topic=persistent://public/default/audit-test
\ No newline at end of file
diff --git a/inlong-audit/audit-source/pom.xml
b/inlong-audit/audit-source/pom.xml
index 7f118b3..8782099 100644
--- a/inlong-audit/audit-source/pom.xml
+++ b/inlong-audit/audit-source/pom.xml
@@ -73,6 +73,11 @@
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <optional>true</optional>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/HighPriorityThreadFactory.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/HighPriorityThreadFactory.java
new file mode 100644
index 0000000..673b75c
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/HighPriorityThreadFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.base;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class HighPriorityThreadFactory
+ implements ThreadFactory {
+ private static final AtomicInteger poolNumber = new AtomicInteger(1);
+ private final AtomicInteger threadNumber;
+ private final ThreadGroup group;
+ private final String namePrefix;
+ private final boolean isDaemon;
+
+ public HighPriorityThreadFactory(String name) {
+ this(name, false);
+ }
+
+ public HighPriorityThreadFactory(String prefix, boolean daemon) {
+ this.threadNumber = new AtomicInteger(1);
+ SecurityManager s = System.getSecurityManager();
+ this.group = s != null ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
+ this.namePrefix = prefix + "-thread-" + poolNumber.getAndIncrement();
+ this.isDaemon = daemon;
+ }
+
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(this.group, r, this.namePrefix +
this.threadNumber.getAndIncrement(), 0L);
+ t.setDaemon(this.isDaemon);
+ t.setPriority(10);
+ return t;
+ }
+}
+
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/NamedThreadFactory.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/NamedThreadFactory.java
new file mode 100644
index 0000000..821050a
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/base/NamedThreadFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.base;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NamedThreadFactory implements ThreadFactory {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(NamedThreadFactory.class);
+
+ private final AtomicInteger mThreadNum = new AtomicInteger(1);
+
+ private final String threadType;
+
+ public NamedThreadFactory(String threadType) {
+ this.threadType = threadType;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, threadType + "-running-thread-" +
mThreadNum.getAndIncrement());
+ LOGGER.debug("{} created", t.getName());
+ return t;
+ }
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/channel/FailoverChannelSelector.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/channel/FailoverChannelSelector.java
new file mode 100644
index 0000000..6343ebc
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/channel/FailoverChannelSelector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.channel;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.channel.AbstractChannelSelector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverChannelSelector extends AbstractChannelSelector {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FailoverChannelSelector.class);
+
+ private static final String MASTER_CHANNEL = "master";
+
+ private int masterIndex = 0;
+
+ private int slaveIndex = 0;
+
+ private final List<Channel> masterChannels = new ArrayList<Channel>();
+ private final List<Channel> slaveChannels = new ArrayList<Channel>();
+
+ @Override
+ public List<Channel> getRequiredChannels(Event event) {
+ List<Channel> retChannels = new ArrayList<Channel>();
+ if (masterChannels.size() > 0) {
+ retChannels.add(masterChannels.get(masterIndex));
+ masterIndex = (masterIndex + 1) % masterChannels.size();
+ } else {
+ LOG.warn("masterChannels size is zero!");
+ }
+
+ return retChannels;
+ }
+
+ @Override
+ public List<Channel> getOptionalChannels(Event event) {
+ List<Channel> retChannels = new ArrayList<Channel>();
+ if (slaveChannels.size() > 0) {
+ retChannels.add(slaveChannels.get(slaveIndex));
+ slaveIndex = (slaveIndex + 1) % slaveChannels.size();
+ } else {
+ LOG.warn("slaveChannels size is zero!");
+ }
+ return retChannels;
+ }
+
+ /**
+ * split channel name into name list.
+ *
+ * @param channelName - channel name
+ * @return - name list
+ */
+ private List<String> splitChannelName(String channelName) {
+ List<String> fileMetricList = new ArrayList<String>();
+ if (StringUtils.isEmpty(channelName)) {
+ LOG.info("channel name is null!");
+ } else {
+ fileMetricList = Arrays.asList(channelName.split("\\s+"));
+ }
+ return fileMetricList;
+ }
+
+ @Override
+ public void configure(Context context) {
+ String masters = context.getString(MASTER_CHANNEL);
+ if (StringUtils.isEmpty(masters)) {
+ throw new FlumeException("master channel is null!");
+ }
+ List<String> masterList = splitChannelName(masters);
+
+ for (Map.Entry<String, Channel> entry :
getChannelNameMap().entrySet()) {
+ String channelName = entry.getKey();
+ Channel channel = entry.getValue();
+ if (masterList.contains(channelName)) {
+ this.masterChannels.add(channel);
+ } else {
+ this.slaveChannels.add(channel);
+ }
+ }
+ LOG.info("masters:" + this.masterChannels);
+ LOG.info("slaves:" + this.slaveChannels);
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/node/Application.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/node/Application.java
new file mode 100644
index 0000000..d1cd269
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/node/Application.java
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.node;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.flume.Channel;
+import org.apache.flume.Constants;
+import org.apache.flume.Context;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.instrumentation.MonitorService;
+import org.apache.flume.instrumentation.MonitoringType;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.apache.flume.lifecycle.LifecycleSupervisor;
+import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
+import org.apache.flume.node.MaterializedConfiguration;
+import org.apache.flume.node.PollingPropertiesFileConfigurationProvider;
+import org.apache.flume.node.PropertiesFileConfigurationProvider;
+import org.apache.flume.util.SSLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Application
+ */
+public class Application {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(Application.class);
+
+ public static final String CONF_MONITOR_CLASS = "flume.monitoring.type";
+ public static final String CONF_MONITOR_PREFIX = "flume.monitoring.";
+
+ private final List<LifecycleAware> components;
+ private final LifecycleSupervisor supervisor;
+ private MaterializedConfiguration materializedConfiguration;
+ private MonitorService monitorServer;
+ private final ReentrantLock lifecycleLock = new ReentrantLock();
+
+ public Application() {
+ this(new ArrayList<LifecycleAware>(0));
+ }
+
+ public Application(List<LifecycleAware> components) {
+ this.components = components;
+ supervisor = new LifecycleSupervisor();
+ }
+
+ public void start() {
+ lifecycleLock.lock();
+ try {
+ for (LifecycleAware component : components) {
+
+ supervisor.supervise(component,
+ new SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
+ }
+ } finally {
+ lifecycleLock.unlock();
+ }
+ }
+
+ @Subscribe
+ public void handleConfigurationEvent(MaterializedConfiguration conf) {
+ try {
+ lifecycleLock.lockInterruptibly();
+ stopAllComponents();
+ startAllComponents(conf);
+ } catch (InterruptedException e) {
+ logger.info("Interrupted while trying to handle configuration
event");
+ return;
+ } finally {
+ // If interrupted while trying to lock, we don't own the lock, so
must not attempt to unlock
+ if (lifecycleLock.isHeldByCurrentThread()) {
+ lifecycleLock.unlock();
+ }
+ }
+ }
+
+ public void stop() {
+ lifecycleLock.lock();
+ stopAllComponents();
+ try {
+ supervisor.stop();
+ if (monitorServer != null) {
+ monitorServer.stop();
+ }
+ } finally {
+ lifecycleLock.unlock();
+ }
+ }
+
+ private void stopAllComponents() {
+ if (this.materializedConfiguration != null) {
+ logger.info("Shutting down configuration: {}",
this.materializedConfiguration);
+ for (Entry<String, SourceRunner> entry :
this.materializedConfiguration
+ .getSourceRunners().entrySet()) {
+ try {
+ logger.info("Stopping Source " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ logger.error("Error while stopping {}", entry.getValue(),
e);
+ }
+ }
+
+ for (Entry<String, SinkRunner> entry :
this.materializedConfiguration.getSinkRunners()
+ .entrySet()) {
+ try {
+ logger.info("Stopping Sink " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ logger.error("Error while stopping {}", entry.getValue(),
e);
+ }
+ }
+
+ for (Entry<String, Channel> entry :
this.materializedConfiguration.getChannels()
+ .entrySet()) {
+ try {
+ logger.info("Stopping Channel " + entry.getKey());
+ supervisor.unsupervise(entry.getValue());
+ } catch (Exception e) {
+ logger.error("Error while stopping {}", entry.getValue(),
e);
+ }
+ }
+ }
+ if (monitorServer != null) {
+ monitorServer.stop();
+ }
+ }
+
+ private void startAllComponents(MaterializedConfiguration
materializedConfiguration) {
+ logger.info("Starting new configuration:{}",
materializedConfiguration);
+
+ this.materializedConfiguration = materializedConfiguration;
+
+ for (Entry<String, Channel> entry :
materializedConfiguration.getChannels().entrySet()) {
+ try {
+ logger.info("Starting Channel " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+
+ /*
+ * Wait for all channels to start.
+ */
+ for (Channel ch : materializedConfiguration.getChannels().values()) {
+ while (ch.getLifecycleState() != LifecycleState.START
+ && !supervisor.isComponentInErrorState(ch)) {
+ try {
+ logger.info("Waiting for channel: " + ch.getName()
+ + " to start. Sleeping for 500 ms");
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ logger.error("Interrupted while waiting for channel to
start.", e);
+ Throwables.propagate(e);
+ }
+ }
+ }
+
+ for (Entry<String, SinkRunner> entry :
materializedConfiguration.getSinkRunners()
+ .entrySet()) {
+ try {
+ logger.info("Starting Sink " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+
+ for (Entry<String, SourceRunner> entry :
materializedConfiguration.getSourceRunners()
+ .entrySet()) {
+ try {
+ logger.info("Starting Source " + entry.getKey());
+ supervisor.supervise(entry.getValue(),
+ new SupervisorPolicy.AlwaysRestartPolicy(),
LifecycleState.START);
+ } catch (Exception e) {
+ logger.error("Error while starting {}", entry.getValue(), e);
+ }
+ }
+
+ this.loadMonitoring();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void loadMonitoring() {
+ Properties systemProps = System.getProperties();
+ Set<String> keys = systemProps.stringPropertyNames();
+ try {
+ if (keys.contains(CONF_MONITOR_CLASS)) {
+ String monitorType =
systemProps.getProperty(CONF_MONITOR_CLASS);
+ Class<? extends MonitorService> klass;
+ try {
+ // Is it a known type?
+ klass = MonitoringType.valueOf(
+
monitorType.toUpperCase(Locale.ENGLISH)).getMonitorClass();
+ } catch (Exception e) {
+ // Not a known type, use FQCN
+ klass = (Class<? extends MonitorService>)
Class.forName(monitorType);
+ }
+ this.monitorServer =
klass.getDeclaredConstructor().newInstance();
+ Context context = new Context();
+ for (String key : keys) {
+ if (key.startsWith(CONF_MONITOR_PREFIX)) {
+
context.put(key.substring(CONF_MONITOR_PREFIX.length()),
+ systemProps.getProperty(key));
+ }
+ }
+ monitorServer.configure(context);
+ monitorServer.start();
+ }
+ } catch (Exception e) {
+ logger.warn("Error starting monitoring. "
+ + "Monitoring might not be available.", e);
+ }
+
+ }
+
+ /**
+ * main
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ try {
+ SSLUtil.initGlobalSSLParameters();
+
+ Options options = new Options();
+
+ Option option = new Option("n", "name", true, "the name of this
agent");
+ option.setRequired(true);
+ options.addOption(option);
+
+ option = new Option("f", "conf-file", true,
+ "specify a config file (required if -z missing)");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option(null, "no-reload-conf", false,
+ "do not reload config file if changed");
+ options.addOption(option);
+
+ option = new Option("h", "help", false, "display help text");
+ options.addOption(option);
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine commandLine = parser.parse(options, args);
+
+ if (commandLine.hasOption('h')) {
+ new HelpFormatter().printHelp("flume-ng agent", options, true);
+ return;
+ }
+
+ String agentName = commandLine.getOptionValue('n');
+ boolean reload = !commandLine.hasOption("no-reload-conf");
+
+ Application application;
+
+ File configurationFile = new File(commandLine.getOptionValue('f'));
+
+ // The following is to ensure that by default the agent will fail
on startup
+ // if the file does not exist.
+ if (!configurationFile.exists()) {
+ // If command line invocation, then need to fail fast
+ if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE)
== null) {
+ String path = configurationFile.getPath();
+ try {
+ path = configurationFile.getCanonicalPath();
+ } catch (IOException ex) {
+ logger.error("Failed to read canonical path for file:
" + path,
+ ex);
+ }
+ throw new ParseException(
+ "The specified configuration file does not exist:
" + path);
+ }
+ }
+ List<LifecycleAware> components = Lists.newArrayList();
+
+ if (reload) {
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ PollingPropertiesFileConfigurationProvider
configurationProvider;
+ configurationProvider = new
PollingPropertiesFileConfigurationProvider(
+ agentName, configurationFile, eventBus, 30);
+ components.add(configurationProvider);
+ application = new Application(components);
+ eventBus.register(application);
+ } else {
+ PropertiesFileConfigurationProvider configurationProvider;
+ configurationProvider = new
PropertiesFileConfigurationProvider(
+ agentName, configurationFile);
+ application = new Application();
+
application.handleConfigurationEvent(configurationProvider.getConfiguration());
+ }
+
+ //start application
+ application.start();
+
+ final Application appReference = application;
+ Runtime.getRuntime().addShutdownHook(new
Thread("agent-shutdown-hook") {
+
+ @Override
+ public void run() {
+ appReference.stop();
+ }
+ });
+
+ } catch (Exception e) {
+ logger.error("A fatal error occurred while running. Exception
follows.", e);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/EventStat.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/EventStat.java
new file mode 100644
index 0000000..31a7223
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/EventStat.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.flume.Event;
+
+@Getter
+@Setter
+public class EventStat {
+ private Event event;
+ private int myRetryCnt;
+
+ public EventStat(Event event) {
+ this.event = event;
+ this.myRetryCnt = 0;
+ }
+
+ public EventStat(Event event, int retryCnt) {
+ this.event = event;
+ this.myRetryCnt = retryCnt;
+ }
+
+ public void incRetryCnt() {
+ this.myRetryCnt++;
+ }
+
+ public boolean shouldDrop() {
+ return false;
+ }
+
+ public void reset() {
+ this.event = null;
+ this.myRetryCnt = 0;
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
new file mode 100644
index 0000000..cdc850c
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/PulsarSink.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.inlong.audit.base.HighPriorityThreadFactory;
+import org.apache.inlong.audit.sink.pulsar.CreatePulsarClientCallBack;
+import org.apache.inlong.audit.sink.pulsar.PulsarClientService;
+import org.apache.inlong.audit.sink.pulsar.SendMessageCallBack;
+import org.apache.inlong.audit.utils.FailoverChannelProcessorHolder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
+import
org.apache.pulsar.client.api.PulsarClientException.NotConnectedException;
+import
org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
+import
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * pulsar sink
+ *
+ * send to one pulsar cluster
+ */
+public class PulsarSink extends AbstractSink implements Configurable,
SendMessageCallBack,
+ CreatePulsarClientCallBack {
+ private static final Logger logger =
LoggerFactory.getLogger(PulsarSink.class);
+
+ /*
+ * properties for header info
+ */
+ private static String TOPIC = "topic";
+
+ /*
+ * default value
+ */
+ private static int BAD_EVENT_QUEUE_SIZE = 10000;
+ private static int BATCH_SIZE = 10000;
+ private static final int DEFAULT_LOG_EVERY_N_EVENTS = 100000;
+
+ /*
+ * properties for stat
+ */
+ private static String LOG_EVERY_N_EVENTS = "log_every_n_events";
+
+ private static String DISK_IO_RATE_PER_SEC = "disk_io_rate_per_sec";
+
+ private static final String SINK_THREAD_NUM = "thread_num";
+
+ /*
+ * for log
+ */
+ private Integer logEveryNEvents;
+
+ private long diskIORatePerSec;
+
+ private RateLimiter diskRateLimiter;
+
+ /*
+ * for stat
+ */
+ private AtomicLong currentSuccessSendCnt = new AtomicLong(0);
+
+ private AtomicLong lastSuccessSendCnt = new AtomicLong(0);
+
+ private long t1 = System.currentTimeMillis();
+
+ private long t2 = 0L;
+
+ private static AtomicLong totalPulsarSuccSendCnt = new AtomicLong(0);
+
+ private static AtomicLong totalPulsarSuccSendSize = new AtomicLong(0);
+ /*
+ * for control
+ */
+ private boolean overflow = false;
+
+ private LinkedBlockingQueue<EventStat> resendQueue;
+
+ private long logCounter = 0;
+
+ private final AtomicLong currentInFlightCount = new AtomicLong(0);
+
+ /*
+ * whether the SendTask thread can send data to pulsar
+ */
+ private volatile boolean canSend = false;
+
+ /*
+ * Control whether the SinkRunner thread can read data from the Channel
+ */
+ private volatile boolean canTake = false;
+
+
+ private static int EVENT_QUEUE_SIZE = 1000;
+
+ private int threadNum;
+
+
+ /*
+ * send thread pool
+ */
+ private Thread[] sinkThreadPool;
+ private LinkedBlockingQueue<Event> eventQueue;
+
+ private SinkCounter sinkCounter;
+
+ private PulsarClientService pulsarClientService;
+
+ private static final Long PRINT_INTERVAL = 30L;
+
+ private static final PulsarPerformanceTask pulsarPerformanceTask = new
PulsarPerformanceTask();
+
+ private static ScheduledExecutorService scheduledExecutorService =
Executors
+ .newScheduledThreadPool(1, new
HighPriorityThreadFactory("pulsarPerformance-Printer-thread"));
+
+ private String topic;
+
+ static {
+ /*
+ * stat pulsar performance
+ */
+ System.out.println("pulsarPerformanceTask!!!!!!");
+ scheduledExecutorService.scheduleWithFixedDelay(pulsarPerformanceTask,
0L,
+ PRINT_INTERVAL, TimeUnit.SECONDS);
+ }
+
+ public PulsarSink() {
+ super();
+ logger.debug("new instance of PulsarSink!");
+ }
+
+ /**
+ * configure
+ * @param context
+ */
+ public void configure(Context context) {
+ logger.info("PulsarSink started and context = {}", context.toString());
+ /*
+ * topic config
+ */
+ topic = context.getString(TOPIC);
+ logEveryNEvents = context.getInteger(LOG_EVERY_N_EVENTS,
DEFAULT_LOG_EVERY_N_EVENTS);
+ logger.debug(this.getName() + " " + LOG_EVERY_N_EVENTS + " " +
logEveryNEvents);
+ Preconditions.checkArgument(logEveryNEvents > 0, "logEveryNEvents must
be > 0");
+
+ resendQueue = new LinkedBlockingQueue<EventStat>(BAD_EVENT_QUEUE_SIZE);
+
+ String sinkThreadNum = context.getString(SINK_THREAD_NUM, "4");
+ threadNum = Integer.parseInt(sinkThreadNum);
+ Preconditions.checkArgument(threadNum > 0, "threadNum must be > 0");
+ sinkThreadPool = new Thread[threadNum];
+ eventQueue = new LinkedBlockingQueue<Event>(EVENT_QUEUE_SIZE);
+
+ diskIORatePerSec = context.getLong(DISK_IO_RATE_PER_SEC,0L);
+ if (diskIORatePerSec != 0) {
+ diskRateLimiter = RateLimiter.create(diskIORatePerSec);
+ }
+ pulsarClientService = new PulsarClientService(context);
+
+ if (sinkCounter == null) {
+ sinkCounter = new SinkCounter(getName());
+ }
+ }
+
+ private void initTopic() throws Exception {
+ long startTime = System.currentTimeMillis();
+ if (topic != null) {
+ pulsarClientService.initTopicProducer(topic);
+ }
+ logger.info(getName() + " initTopic cost: "
+ + (System.currentTimeMillis() - startTime) + "ms");
+ }
+
+ @Override
+ public void start() {
+ logger.info("pulsar sink starting...");
+ sinkCounter.start();
+ pulsarClientService.initCreateConnection(this);
+
+ super.start();
+ this.canSend = true;
+ this.canTake = true;
+ try {
+ initTopic();
+ } catch (Exception e) {
+ logger.info("meta sink start publish topic fail.",e);
+ }
+
+ for (int i = 0; i < sinkThreadPool.length; i++) {
+ sinkThreadPool[i] = new Thread(new SinkTask(), getName()
+ + "_pulsar_sink_sender-"
+ + i);
+ sinkThreadPool[i].start();
+ }
+ logger.debug("meta sink started");
+ }
+
+ @Override
+ public void stop() {
+ logger.info("pulsar sink stopping");
+ pulsarClientService.close();
+ this.canTake = false;
+ int waitCount = 0;
+ while (eventQueue.size() != 0 && waitCount++ < 10) {
+ try {
+ Thread.currentThread().sleep(1000);
+ } catch (InterruptedException e) {
+ logger.info("Stop thread has been interrupt!");
+ break;
+ }
+ }
+ this.canSend = false;
+
+ if (sinkThreadPool != null) {
+ for (Thread thread : sinkThreadPool) {
+ if (thread != null) {
+ thread.interrupt();
+ }
+ }
+ sinkThreadPool = null;
+ }
+ super.stop();
+ if (!scheduledExecutorService.isShutdown()) {
+ scheduledExecutorService.shutdown();
+ }
+ sinkCounter.stop();
+ logger.debug("pulsar sink stopped. Metrics:{}", sinkCounter);
+ }
+
+ @Override
+ public Status process() throws EventDeliveryException {
+ logger.debug("process......");
+ if (!this.canTake) {
+ return Status.BACKOFF;
+ }
+
+ Status status = Status.READY;
+ Channel channel = getChannel();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ try {
+ Event event = channel.take();
+ if (event != null) {
+ if (diskRateLimiter != null) {
+ diskRateLimiter.acquire(event.getBody().length);
+ }
+ if (!eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS))
{
+ logger.info("[{}] Channel --> Queue(has no enough
space,current code point) "
+ + "--> pulsar,Check if pulsar server or network is
ok.(if this situation "
+ + "last long time it will cause memoryChannel full
and fileChannel write.)", getName());
+ tx.rollback();
+ } else {
+ tx.commit();
+ }
+ } else {
+ status = Status.BACKOFF;
+ tx.commit();
+ }
+ } catch (Throwable t) {
+ logger.error("Process event failed!" + this.getName(), t);
+ try {
+ tx.rollback();
+ } catch (Throwable e) {
+ logger.error("metasink transaction rollback exception",e);
+
+ }
+ } finally {
+ tx.close();
+ }
+ return status;
+ }
+
+ @Override
+ public void handleCreateClientSuccess(String url) {
+ logger.info("createConnection success for url = {}", url);
+ sinkCounter.incrementConnectionCreatedCount();
+ }
+
+ @Override
+ public void handleCreateClientException(String url) {
+ logger.error("createConnection has exception for url = {}", url);
+ sinkCounter.incrementConnectionFailedCount();
+ }
+
+ @Override
+ public void handleMessageSendSuccess(Object result, EventStat eventStat) {
+ /*
+ * Statistics pulsar performance
+ */
+ totalPulsarSuccSendCnt.incrementAndGet();
+
totalPulsarSuccSendSize.addAndGet(eventStat.getEvent().getBody().length);
+ /*
+ *add to sinkCounter
+ */
+ sinkCounter.incrementEventDrainSuccessCount();
+ currentInFlightCount.decrementAndGet();
+ currentSuccessSendCnt.incrementAndGet();
+ long nowCnt = currentSuccessSendCnt.get();
+ long oldCnt = lastSuccessSendCnt.get();
+ if (nowCnt % logEveryNEvents == 0 && nowCnt !=
lastSuccessSendCnt.get()) {
+ lastSuccessSendCnt.set(nowCnt);
+ t2 = System.currentTimeMillis();
+ logger.info("metasink {}, succ put {} events to pulsar,"
+ + " in the past {} millsec", new Object[] {
+ getName(), (nowCnt - oldCnt), (t2 - t1)
+ });
+ t1 = t2;
+ }
+ }
+
+ @Override
+ public void handleMessageSendException(EventStat eventStat, Object e) {
+ if (e instanceof TooLongFrameException) {
+ PulsarSink.this.overflow = true;
+ } else if (e instanceof ProducerQueueIsFullError) {
+ PulsarSink.this.overflow = true;
+ } else if (!(e instanceof AlreadyClosedException
+ || e instanceof NotConnectedException
+ || e instanceof TopicTerminatedException)) {
+ logger.error("handle message send exception ,msg will resend
later, e = {}", e);
+ }
+ eventStat.incRetryCnt();
+ resendEvent(eventStat, true);
+ }
+
+ /**
+ * Resend the data and store the data in the memory cache.
+ * @param es
+ * @param isDecrement
+ */
+ private void resendEvent(EventStat es, boolean isDecrement) {
+ try {
+ if (isDecrement) {
+ currentInFlightCount.decrementAndGet();
+ }
+ if (es == null || es.getEvent() == null) {
+ return;
+ }
+ if (!resendQueue.offer(es)) {
+
FailoverChannelProcessorHolder.getChannelProcessor().processEvent(es.getEvent());
+ }
+ } catch (Throwable throwable) {
+ logger.error("resendEvent e = {}", throwable);
+ }
+ }
+
+ static class PulsarPerformanceTask implements Runnable {
+ @Override
+ public void run() {
+ try {
+ if (totalPulsarSuccSendSize.get() != 0) {
+ logger.info("Total pulsar performance tps :"
+ + totalPulsarSuccSendCnt.get() / PRINT_INTERVAL
+ + "/s, avg msg size:"
+ + totalPulsarSuccSendSize.get() /
totalPulsarSuccSendCnt.get()
+ + ",print every " + PRINT_INTERVAL + " seconds");
+ /*
+ * totalpulsarSuccSendCnt represents the number of packets
+ */
+ totalPulsarSuccSendCnt.set(0);
+ totalPulsarSuccSendSize.set(0);
+ }
+
+ } catch (Exception e) {
+ logger.info("pulsarPerformanceTask error", e);
+ }
+ }
+ }
+
+ class SinkTask implements Runnable {
+ @Override
+ public void run() {
+ logger.info("Sink task {} started.",
Thread.currentThread().getName());
+ while (canSend) {
+ logger.debug("SinkTask process......");
+ boolean decrementFlag = false;
+ Event event = null;
+ EventStat eventStat = null;
+ try {
+ if (PulsarSink.this.overflow) {
+ PulsarSink.this.overflow = false;
+ Thread.currentThread().sleep(10);
+ }
+ if (!resendQueue.isEmpty()) {
+ /*
+ * Send the data in the retry queue first
+ */
+ eventStat = resendQueue.poll();
+ if (eventStat != null) {
+ event = eventStat.getEvent();
+ }
+ } else {
+ if (currentInFlightCount.get() > BATCH_SIZE) {
+ /*
+ * Under the condition that the number of
unresponsive messages
+ * is greater than 1w, the number of unresponsive
messages sent
+ * to pulsar will be printed periodically
+ */
+ logCounter++;
+ if (logCounter == 1 || logCounter % 100000 == 0) {
+ logger.info(getName()
+ + " currentInFlightCount={}
resendQueue"
+ + ".size={}",
+
currentInFlightCount.get(),resendQueue.size());
+ }
+ if (logCounter > Long.MAX_VALUE - 10) {
+ logCounter = 0;
+ }
+ }
+ event = eventQueue.take();
+ eventStat = new EventStat(event);
+ sinkCounter.incrementEventDrainAttemptCount();
+ }
+ logger.debug("Event is {}, topic = {} ",event, topic);
+
+ if (event == null) {
+ continue;
+ }
+
+ if (topic == null || topic.equals("")) {
+ logger.warn("no topic specified in event header, just
skip this event");
+ continue;
+ }
+
+ final EventStat es = eventStat;
+ boolean sendResult =
pulsarClientService.sendMessage(topic, event,
+ PulsarSink.this, es);
+ if (!sendResult) {
+ continue;
+ }
+ currentInFlightCount.incrementAndGet();
+ decrementFlag = true;
+ } catch (InterruptedException e) {
+ logger.info("Thread {} has been interrupted!",
Thread.currentThread().getName());
+ return;
+ } catch (Throwable t) {
+ if (t instanceof PulsarClientException) {
+ String message = t.getMessage();
+ if (message != null && (message.contains("No available
queue for topic")
+ || message.contains("The brokers of topic are
all forbidden"))) {
+ logger.info("IllegalTopicMap.put " + topic);
+ continue;
+ } else {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ //ignore..
+ }
+ }
+ }
+ resendEvent(eventStat, decrementFlag);
+ }
+ }
+ }
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/CreatePulsarClientCallBack.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/CreatePulsarClientCallBack.java
new file mode 100644
index 0000000..40e6a8a
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/CreatePulsarClientCallBack.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink.pulsar;
+
+/**
+ * call back interface for create pulsar client
+ */
+public interface CreatePulsarClientCallBack {
+
+ /**
+ * call after create pulsar client success
+ * @param url
+ */
+ void handleCreateClientSuccess(String url);
+
+ /**
+ * call after create pulsar client has exception
+ * @param url
+ */
+ void handleCreateClientException(String url);
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
new file mode 100644
index 0000000..efbd821
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/PulsarClientService.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink.pulsar;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.inlong.audit.consts.AttributeConstants;
+import org.apache.inlong.audit.sink.EventStat;
+import org.apache.inlong.audit.utils.LogCounter;
+import org.apache.inlong.audit.utils.NetworkUtils;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarClientService {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PulsarClientService.class);
+
+ private static final LogCounter logPrinterA = new LogCounter(10, 100000,
60 * 1000);
+
+ /*
+ * properties key for pulsar client
+ */
+ private static String PULSAR_SERVER_URL = "pulsar_server_url";
+
+ /*
+ * properties key pulsar producer
+ */
+ private static String SEND_TIMEOUT = "send_timeout_ms";
+ private static String CLIENT_TIMEOUT = "client_op_timeout_second";
+ private static String ENABLE_BATCH = "enable_batch";
+ private static String BLOCK_IF_QUEUE_FULL = "block_if_queue_full";
+ private static String MAX_PENDING_MESSAGES = "max_pending_messages";
+ private static String MAX_BATCHING_MESSAGES = "max_batching_messages";
+
+ private static int DEFAULT_SEND_TIMEOUT_MILL = 30 * 1000;
+ private static int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
+ private static boolean DEFAULT_ENABLE_BATCH = true;
+ private static boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
+ private static int DEFAULT_MAX_PENDING_MESSAGES = 10000;
+ private static int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
+
+ /*
+ * for producer
+ */
+ private Integer sendTimeout; // in millsec
+ private Integer clientOpTimeout;
+ private boolean enableBatch = true;
+ private boolean blockIfQueueFull = true;
+ private int maxPendingMessages = 10000;
+ private int maxBatchingMessages = 1000;
+ public ConcurrentHashMap<String, Producer> producerInfoMap;
+ public PulsarClient pulsarClient;
+ public String pulsarServerUrl;
+
+ private String localIp = "127.0.0.1";
+
+ /**
+ * pulsar client service
+ * @param context
+ */
+ public PulsarClientService(Context context) {
+
+ pulsarServerUrl = context.getString(PULSAR_SERVER_URL);
+ Preconditions.checkState(pulsarServerUrl != null, "No pulsar server
url specified");
+
+ sendTimeout = context.getInteger(SEND_TIMEOUT,
DEFAULT_SEND_TIMEOUT_MILL);
+ clientOpTimeout = context.getInteger(CLIENT_TIMEOUT,
DEFAULT_CLIENT_TIMEOUT_SECOND);
+ logger.debug("PulsarClientService " + SEND_TIMEOUT + " " +
sendTimeout);
+ Preconditions.checkArgument(sendTimeout > 0, "sendTimeout must be >
0");
+
+ enableBatch = context.getBoolean(ENABLE_BATCH, DEFAULT_ENABLE_BATCH);
+ blockIfQueueFull = context.getBoolean(BLOCK_IF_QUEUE_FULL,
DEFAULT_BLOCK_IF_QUEUE_FULL);
+ maxPendingMessages = context.getInteger(MAX_PENDING_MESSAGES,
DEFAULT_MAX_PENDING_MESSAGES);
+ maxBatchingMessages = context.getInteger(MAX_BATCHING_MESSAGES,
DEFAULT_MAX_BATCHING_MESSAGES);
+ producerInfoMap = new ConcurrentHashMap<>();
+ localIp = NetworkUtils.getLocalIp();
+
+ }
+
+ /**
+ * init connection
+ * @param callBack
+ */
+ public void initCreateConnection(CreatePulsarClientCallBack callBack) {
+ try {
+ createConnection(callBack);
+ } catch (FlumeException e) {
+ logger.error("Unable to create pulsar client" + ". Exception
follows.", e);
+ close();
+ }
+ }
+
+ /**
+ * send message
+ * @param topic
+ * @param event
+ * @param sendMessageCallBack
+ * @param es
+ * @return
+ */
+ public boolean sendMessage(String topic, Event event,
+ SendMessageCallBack sendMessageCallBack, EventStat es) {
+ Producer producer = null;
+ try {
+ producer = getProducer(topic);
+ } catch (Exception e) {
+ if (logPrinterA.shouldPrint()) {
+ logger.error("Get producer failed!", e);
+ }
+ }
+
+ if (producer == null) {
+ logger.error("Get producer is null!");
+ return false;
+ }
+
+ Map<String, String> proMap = new HashMap<>();
+ proMap.put("auditIp", localIp);
+ String streamId = "";
+ String groupId = "";
+ if
(event.getHeaders().containsKey(AttributeConstants.INLONG_STREAM_ID)) {
+ streamId =
event.getHeaders().get(AttributeConstants.INLONG_STREAM_ID);
+ proMap.put(AttributeConstants.INLONG_STREAM_ID, streamId);
+ }
+ if
(event.getHeaders().containsKey(AttributeConstants.INLONG_GROUP_ID)) {
+ groupId =
event.getHeaders().get(AttributeConstants.INLONG_GROUP_ID);
+ proMap.put(AttributeConstants.INLONG_GROUP_ID, groupId);
+ }
+
+ logger.debug("producer send msg!");
+ producer.newMessage().properties(proMap).value(event.getBody())
+ .sendAsync().thenAccept((msgId) -> {
+ sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl)msgId,
es);
+
+ }).exceptionally((e) -> {
+ sendMessageCallBack.handleMessageSendException(es, e);
+ return null;
+ });
+ return true;
+ }
+
+ /**
+ * If this function is called successively without calling {@see
#destroyConnection()}, only the
+ * first call has any effect.
+ *
+ * @throws FlumeException if an RPC client connection could not be opened
+ */
+ private void createConnection(CreatePulsarClientCallBack callBack) throws
FlumeException {
+ if (pulsarClient != null) {
+ return;
+ }
+ try {
+ pulsarClient = initPulsarClient(pulsarServerUrl);
+ callBack.handleCreateClientSuccess(pulsarServerUrl);
+ } catch (PulsarClientException e) {
+ callBack.handleCreateClientException(pulsarServerUrl);
+ logger.error("create connnection error in metasink, "
+ + "maybe pulsar master set error, please
re-check.url{}, ex1 {}",
+ pulsarServerUrl,
+ e.getMessage());
+ } catch (Throwable e) {
+ callBack.handleCreateClientException(pulsarServerUrl);
+ logger.error("create connnection error in metasink, "
+ + "maybe pulsar master set error/shutdown in
progress, please "
+ + "re-check. url{}, ex2 {}",
+ pulsarServerUrl,
+ e.getMessage());
+ }
+ }
+
+ private PulsarClient initPulsarClient(String pulsarUrl) throws Exception {
+ return PulsarClient.builder()
+ .serviceUrl(pulsarUrl)
+ .connectionTimeout(clientOpTimeout, TimeUnit.SECONDS)
+ .build();
+ }
+
+ public Producer initTopicProducer(String topic) {
+ logger.info("initTopicProducer topic = {}", topic);
+ Producer producer = null;
+ try {
+ producer = pulsarClient.newProducer().sendTimeout(sendTimeout,
+ TimeUnit.MILLISECONDS)
+ .topic(topic)
+ .enableBatching(enableBatch)
+ .blockIfQueueFull(blockIfQueueFull)
+ .maxPendingMessages(maxPendingMessages)
+ .batchingMaxMessages(maxBatchingMessages)
+ .create();
+ } catch (PulsarClientException e) {
+ logger.error("create pulsar client has error e = {}", e);
+ }
+ return producer;
+ }
+
+ private Producer getProducer(String topic) {
+ return producerInfoMap.computeIfAbsent(topic, (k) ->
initTopicProducer(topic));
+ }
+
+ public void closeTopicProducer(String topic) {
+ logger.info("closeTopicProducer topic = {}", topic);
+ Producer producer = producerInfoMap.remove(topic);
+ if (producer != null) {
+ producer.closeAsync();
+ }
+ }
+
+ private void destroyConnection() {
+ producerInfoMap.clear();
+ if (pulsarClient != null) {
+ try {
+ pulsarClient.shutdown();
+ } catch (PulsarClientException e) {
+ logger.error("destroy pulsarClient error in PulsarSink,
PulsarClientException {}",
+ e.getMessage());
+ } catch (Exception e) {
+ logger.error("destroy pulsarClient error in PulsarSink, ex
{}", e.getMessage());
+ }
+ }
+ pulsarClient = null;
+ logger.debug("closed meta producer");
+ }
+
+ public void close() {
+ destroyConnection();
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/SendMessageCallBack.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/SendMessageCallBack.java
new file mode 100644
index 0000000..9fb64d5
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/sink/pulsar/SendMessageCallBack.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink.pulsar;
+
+import org.apache.inlong.audit.sink.EventStat;
+
+public interface SendMessageCallBack {
+
+ void handleMessageSendSuccess(Object msgId, EventStat es);
+
+ void handleMessageSendException(EventStat es, Object exception);
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/DefaultServiceDecoder.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/DefaultServiceDecoder.java
new file mode 100644
index 0000000..26c230f
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/DefaultServiceDecoder.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import org.apache.inlong.audit.consts.ConfigConstants;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
+
+public class DefaultServiceDecoder implements ServiceDecoder {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(DefaultServiceDecoder.class);
+
+ @Override
+ public BaseCommand extractData(ChannelBuffer cb, Channel channel) throws
Exception {
+
+ /*[cmd size] | [cmd]*/
+ if (null == cb) {
+ LOG.error("cb == null");
+ return null;
+ }
+ int totalLen = cb.readableBytes();
+ if (ConfigConstants.MSG_MAX_LENGTH_BYTES < totalLen) {
+ throw new Exception(new Throwable("err msg,
ConfigConstants.MSG_MAX_LENGTH_BYTES "
+ + "< totalLen, and totalLen=" + totalLen));
+ }
+ cb.markReaderIndex();
+ BaseCommand cmd = null;
+ BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
+ int cmdSize = cb.readInt();
+ if (cmdSize <= totalLen) {
+ byte[] bodyBytes = new byte[cmdSize];
+ cb.readBytes(bodyBytes);
+ LOG.debug("msg totalDataLen = {}, cmdSize = {}", totalLen,
cmdSize);
+ cmd = cmdBuilder.mergeFrom(bodyBytes).build();
+ } else {
+ // reset index.
+ cb.resetReaderIndex();
+ }
+ return cmd;
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageFactory.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageFactory.java
new file mode 100644
index 0000000..e254c6e
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import java.lang.reflect.Constructor;
+import java.util.concurrent.TimeUnit;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.source.AbstractSource;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServerMessageFactory implements ChannelPipelineFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ServerMessageFactory.class);
+ private static final int DEFAULT_READ_IDLE_TIME = 70 * 60 * 1000;
+ private AbstractSource source;
+ private ChannelProcessor processor;
+ private ChannelGroup allChannels;
+ private ServiceDecoder serviceDecoder;
+ private String messageHandlerName;
+ private int maxConnections = Integer.MAX_VALUE;
+ private int maxMsgLength;
+ private String name;
+ private Timer timer = new HashedWheelTimer();
+
+ /**
+ * get server factory
+ *
+ * @param source
+ * @param allChannels
+ * @param serviceDecoder
+ * @param messageHandlerName
+ * @param maxMsgLength
+ * @param maxCons
+ * @param name
+ */
+ public ServerMessageFactory(AbstractSource source,
+ ChannelGroup allChannels, ServiceDecoder serviceDecoder,
+ String messageHandlerName, Integer maxMsgLength, Integer maxCons,
String name) {
+ this.source = source;
+ this.processor = source.getChannelProcessor();
+ this.allChannels = allChannels;
+ this.serviceDecoder = serviceDecoder;
+ this.messageHandlerName = messageHandlerName;
+ this.name = name;
+ this.maxConnections = maxCons;
+ this.maxMsgLength = maxMsgLength;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline cp = Channels.pipeline();
+ return addMessageHandlersTo(cp);
+ }
+
+ /**
+ * get message handlers
+ * @param cp
+ * @return
+ */
+ public ChannelPipeline addMessageHandlersTo(ChannelPipeline cp) {
+
+ cp.addLast("messageDecoder", new LengthFieldBasedFrameDecoder(
+ this.maxMsgLength, 0, 4, 0, 0, true));
+ cp.addLast("readTimeoutHandler", new ReadTimeoutHandler(timer,
+ DEFAULT_READ_IDLE_TIME, TimeUnit.MILLISECONDS));
+
+ if (processor != null) {
+ try {
+ Class<? extends SimpleChannelHandler> clazz = (Class<? extends
SimpleChannelHandler>) Class
+ .forName(messageHandlerName);
+
+ Constructor<?> ctor = clazz.getConstructor(
+ AbstractSource.class, ServiceDecoder.class,
ChannelGroup.class, Integer.class);
+
+ SimpleChannelHandler messageHandler = (SimpleChannelHandler)
ctor
+ .newInstance(source, serviceDecoder, allChannels,
maxConnections);
+
+ cp.addLast("messageHandler", messageHandler);
+ } catch (Exception e) {
+ LOG.info("SimpleChannelHandler.newInstance has error:" +
name, e);
+ }
+ }
+ return cp;
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
new file mode 100644
index 0000000..77e3d9a
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServerMessageHandler.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.gson.Gson;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.SocketAddress;
+import java.util.List;
+import org.apache.flume.Event;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.source.AbstractSource;
+
+import org.apache.inlong.audit.protocol.AuditApi.AuditMessageBody;
+import org.apache.inlong.audit.protocol.AuditApi.AuditReply;
+import org.apache.inlong.audit.protocol.AuditApi.AuditReply.RSP_CODE;
+import org.apache.inlong.audit.protocol.AuditApi.AuditRequest;
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
+import org.apache.inlong.audit.protocol.AuditData;
+import org.apache.inlong.audit.protocol.Commands;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Server message handler
+ *
+ */
+public class ServerMessageHandler extends SimpleChannelHandler {
+
+ private static final Logger logger =
LoggerFactory.getLogger(ServerMessageHandler.class);
+
+ private static final String DEFAULT_REMOTE_IP_VALUE = "0.0.0.0";
+
+ private AbstractSource source;
+ private final ChannelGroup allChannels;
+ private int maxConnections = Integer.MAX_VALUE;
+
+ private final ChannelProcessor processor;
+ private final ServiceDecoder serviceDecoder;
+
+ private final Gson gson = new Gson();
+
+ public ServerMessageHandler(AbstractSource source, ServiceDecoder
serviceDecoder,
+ ChannelGroup allChannels, Integer maxCons) {
+ this.source = source;
+ this.processor = source.getChannelProcessor();
+ this.serviceDecoder = serviceDecoder;
+ this.allChannels = allChannels;
+ this.maxConnections = maxCons;
+
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
+ if (allChannels.size() - 1 >= maxConnections) {
+ logger.warn("refuse to connect , and connections=" +
(allChannels.size() - 1)
+ + ", maxConnections="
+ + maxConnections + ",channel is " + e.getChannel());
+ e.getChannel().disconnect();
+ e.getChannel().close();
+ }
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
+ logger.debug("message received");
+ if (e == null) {
+ logger.warn("get null message event, just skip");
+ return;
+ }
+ ChannelBuffer cb = ((ChannelBuffer) e.getMessage());
+ SocketAddress remoteSocketAddress = e.getRemoteAddress();
+ int len = cb.readableBytes();
+ if (len == 0) {
+ logger.warn("receive message skip empty msg.");
+ cb.clear();
+ return;
+ }
+ Channel remoteChannel = e.getChannel();
+ BaseCommand cmd = null;
+ try {
+ cmd = serviceDecoder.extractData(cb, remoteChannel);
+ } catch (Exception ex) {
+ logger.error("extractData has error e {}", ex);
+ throw new IOException(ex.getCause());
+ }
+
+ if (cmd == null) {
+ logger.warn("receive message extractData is null");
+ return;
+ }
+ ChannelBuffer channelBuffer = null;
+ switch (cmd.getType()) {
+ case PING:
+ checkArgument(cmd.hasPing());
+ channelBuffer = Commands.getPongChannelBuffer();
+ break;
+ case PONG:
+ checkArgument(cmd.hasPong());
+ channelBuffer = Commands.getPingChannelBuffer();
+ break;
+ case AUDITREQUEST:
+ checkArgument(cmd.hasAuditRequest());
+ AuditReply auditReply = handleRequest(cmd.getAuditRequest());
+ channelBuffer = Commands.getAuditReplylBuffer(auditReply);
+ break;
+ case AUDITREPLY:
+ checkArgument(cmd.hasAuditReply());
+ break;
+ }
+ if (channelBuffer != null) {
+ writeResponse(remoteChannel, remoteSocketAddress, channelBuffer);
+ }
+ }
+
+ private AuditReply handleRequest(AuditRequest auditRequest) {
+ AuditReply reply = null;
+ if (auditRequest != null) {
+ List<AuditMessageBody> bodyList = auditRequest.getMsgBodyList();
+ if (bodyList != null) {
+ int errorMsgBody = 0;
+ for (AuditMessageBody auditMessageBody : bodyList) {
+ AuditData auditData = new AuditData();
+ auditData.setIp(auditRequest.getMsgHeader().getIp());
+
auditData.setThreadId(auditRequest.getMsgHeader().getThreadId());
+
auditData.setDockerId(auditRequest.getMsgHeader().getDockerId());
+
auditData.setPacketId(auditRequest.getMsgHeader().getPacketId());
+ auditData.setSdkTs(auditRequest.getMsgHeader().getSdkTs());
+
+ auditData.setLogTs(auditMessageBody.getLogTs());
+ auditData.setAuditId(auditMessageBody.getAuditId());
+ auditData.setCount(auditMessageBody.getCount());
+ auditData.setDelay(auditMessageBody.getDelay());
+
auditData.setInlongGroupId(auditMessageBody.getInlongGroupId());
+
auditData.setInlongStreamId(auditMessageBody.getInlongStreamId());
+ auditData.setSize(auditMessageBody.getSize());
+
+ byte[] body = null;
+ try {
+ body = gson.toJson(auditData).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ logger.error("UnsupportedEncodingException = {}", e);
+ }
+ if (body != null) {
+ Event event = null;
+ try {
+ event = EventBuilder.withBody(body, null);
+ processor.processEvent(event);
+ } catch (Throwable ex) {
+ logger.error("Error writing to controller,data
will discard.", ex);
+ errorMsgBody++;
+ }
+ }
+ }
+ if (errorMsgBody != 0) {
+ reply = AuditReply.newBuilder().setMessage("Error writing
to controller,data "
+ + "will discard. error body num = "
+ +
errorMsgBody).setRspCode(RSP_CODE.FAILED).build();
+ }
+ }
+ }
+ if (reply == null) {
+ reply =
AuditReply.newBuilder().setRspCode(RSP_CODE.SUCCESS).build();
+ }
+ return reply;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
+ logger.error("exception caught", e.getCause());
+ }
+
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
+ logger.error("channel closed {}", ctx.getChannel());
+ }
+
+ private void writeResponse(Channel remoteChannel,
+ SocketAddress remoteSocketAddress, ChannelBuffer buffer) throws
Exception {
+ if (remoteChannel.isWritable()) {
+ remoteChannel.write(buffer, remoteSocketAddress);
+ } else {
+ logger.warn(
+ "the send buffer2 is full, so disconnect it!please check
remote client"
+ + "; Connection info:" + remoteChannel);
+ throw new Exception(new Throwable(
+ "the send buffer2 is full,so disconnect it!please check
remote client, Connection info:"
+ + remoteChannel));
+ }
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServiceDecoder.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServiceDecoder.java
new file mode 100644
index 0000000..f42ae30
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/ServiceDecoder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import org.apache.inlong.audit.protocol.AuditApi.BaseCommand;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+
+/**
+ * decoder interface definition
+ *
+ */
+public interface ServiceDecoder {
+
+ /**
+ * extract data from buffer and convert it into map.
+ * @param cb
+ * @param channel
+ * @return
+ */
+ BaseCommand extractData(ChannelBuffer cb, Channel channel) throws
Exception;
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/SimpleTcpSource.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/SimpleTcpSource.java
new file mode 100644
index 0000000..fc0dcd3
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/source/SimpleTcpSource.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.source;
+
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.EventDrivenSource;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.source.AbstractSource;
+import org.apache.inlong.audit.base.NamedThreadFactory;
+import org.apache.inlong.audit.consts.ConfigConstants;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple tcp source
+ *
+ */
+public class SimpleTcpSource extends AbstractSource implements Configurable,
EventDrivenSource {
+
+ private static final Logger logger =
LoggerFactory.getLogger(SimpleTcpSource.class);
+ private static final String CONNECTIONS = "connections";
+
+ protected int maxConnections = Integer.MAX_VALUE;
+ private ServerBootstrap serverBootstrap = null;
+ protected ChannelGroup allChannels;
+ protected int port;
+ protected String host = null;
+ protected String msgFactoryName;
+ protected String serviceDecoderName;
+ protected String messageHandlerName;
+ protected int maxMsgLength;
+ private int maxThreads = 32;
+
+ private boolean tcpNoDelay = true;
+ private boolean keepAlive = true;
+ private int receiveBufferSize;
+ private int highWaterMark;
+ private int sendBufferSize;
+
+ private static String HOST_DEFAULT_VALUE = "0.0.0.0";
+
+ private static int DEFAULT_MAX_THREADS = 32;
+
+ private static int DEFAULT_MAX_CONNECTIONS = 5000;
+
+ private static int MIN_MSG_LENGTH = 4;
+
+ private static int MAX_MSG_LENGTH = 1024 * 64;
+
+ private static int BUFFER_SIZE_MUST_THAN = 0;
+
+ private static int HIGH_WATER_MARK_DEFAULT_VALUE = 64 * 1024;
+
+ private static int RECEIVE_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+ private static int SEND_BUFFER_DEFAULT_SIZE = 64 * 1024;
+
+ private static int RECEIVE_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+ private static int SEND_BUFFER_MAX_SIZE = 16 * 1024 * 1024;
+
+ private Channel nettyChannel = null;
+
+ public SimpleTcpSource() {
+ super();
+ allChannels = new DefaultChannelGroup();
+ }
+
+ @Override
+ public synchronized void start() {
+ logger.info("start " + this.getName());
+ super.start();
+
+
ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+ ChannelFactory factory = new NioServerSocketChannelFactory(Executors
+ .newCachedThreadPool(
+ new
NamedThreadFactory("tcpSource-nettyBoss-threadGroup")),
+ 1,
+ Executors.newCachedThreadPool(
+ new
NamedThreadFactory("tcpSource-nettyWorker-threadGroup")),
+ maxThreads);
+ logger.info("Set max workers : {} ;", maxThreads);
+ ChannelPipelineFactory fac = null;
+
+ serverBootstrap = new ServerBootstrap(factory);
+ serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
+ serverBootstrap.setOption("child.keepAlive", keepAlive);
+ serverBootstrap.setOption("child.receiveBufferSize",
receiveBufferSize);
+ serverBootstrap.setOption("child.sendBufferSize", sendBufferSize);
+ serverBootstrap.setOption("child.writeBufferHighWaterMark",
highWaterMark);
+ logger.info("load msgFactory=" + msgFactoryName + " and
serviceDecoderName="
+ + serviceDecoderName);
+ try {
+
+ ServiceDecoder serviceDecoder =
+ (ServiceDecoder)
Class.forName(serviceDecoderName).newInstance();
+
+ Class<? extends ChannelPipelineFactory> clazz =
+ (Class<? extends ChannelPipelineFactory>)
Class.forName(msgFactoryName);
+
+ Constructor ctor =
+ clazz.getConstructor(AbstractSource.class,
ChannelGroup.class,
+ ServiceDecoder.class, String.class,
+ Integer.class, Integer.class, String.class);
+
+ logger.info("Using channel processor:{}",
this.getClass().getName());
+ fac = (ChannelPipelineFactory) ctor
+ .newInstance(this, allChannels, serviceDecoder,
+ messageHandlerName, maxMsgLength, maxConnections,
this.getName());
+
+ } catch (Exception e) {
+ logger.error(
+ "Simple Tcp Source start error, fail to construct
ChannelPipelineFactory with name {}, ex {}",
+ msgFactoryName, e);
+ stop();
+ throw new FlumeException(e.getMessage());
+ }
+
+ serverBootstrap.setPipelineFactory(fac);
+
+ try {
+ if (host == null) {
+ nettyChannel = serverBootstrap.bind(new
InetSocketAddress(port));
+ } else {
+ nettyChannel = serverBootstrap.bind(new
InetSocketAddress(host, port));
+ }
+ } catch (Exception e) {
+ logger.error("Simple TCP Source error bind host {} port {},program
will exit!", host,
+ port);
+ System.exit(-1);
+ }
+
+ allChannels.add(nettyChannel);
+
+ logger.info("Simple TCP Source started at host {}, port {}", host,
port);
+
+ }
+
+ @Override
+ public synchronized void stop() {
+ logger.info("[STOP SOURCE]{} stopping...", super.getName());
+ if (allChannels != null && !allChannels.isEmpty()) {
+ try {
+ allChannels.unbind().awaitUninterruptibly();
+ allChannels.close().awaitUninterruptibly();
+ } catch (Exception e) {
+ logger.warn("Simple TCP Source netty server stop ex", e);
+ } finally {
+ allChannels.clear();
+ // allChannels = null;
+ }
+ }
+
+ if (serverBootstrap != null) {
+ try {
+
+ serverBootstrap.releaseExternalResources();
+ } catch (Exception e) {
+ logger.warn("Simple TCP Source serverBootstrap stop ex ", e);
+ } finally {
+ serverBootstrap = null;
+ }
+ }
+
+ super.stop();
+ logger.info("[STOP SOURCE]{} stopped", super.getName());
+ }
+
+ @Override
+ public void configure(Context context) {
+ logger.info("context is {}", context);
+ port = context.getInteger(ConfigConstants.CONFIG_PORT);
+ host = context.getString(ConfigConstants.CONFIG_HOST,
HOST_DEFAULT_VALUE);
+
+ tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true);
+
+ keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true);
+ highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK,
+ HIGH_WATER_MARK_DEFAULT_VALUE);
+ receiveBufferSize =
context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE,
+ RECEIVE_BUFFER_DEFAULT_SIZE);
+ if (receiveBufferSize > RECEIVE_BUFFER_MAX_SIZE) {
+ receiveBufferSize = RECEIVE_BUFFER_MAX_SIZE;
+ }
+ Preconditions.checkArgument(receiveBufferSize > BUFFER_SIZE_MUST_THAN,
+ "receiveBufferSize must be > 0");
+
+ sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE,
SEND_BUFFER_DEFAULT_SIZE);
+ if (sendBufferSize > SEND_BUFFER_MAX_SIZE) {
+ sendBufferSize = SEND_BUFFER_MAX_SIZE;
+ }
+ Preconditions.checkArgument(sendBufferSize > BUFFER_SIZE_MUST_THAN,
+ "sendBufferSize must be > 0");
+
+ try {
+ maxThreads = context.getInteger(ConfigConstants.MAX_THREADS,
DEFAULT_MAX_THREADS);
+ } catch (NumberFormatException e) {
+ logger.warn("Simple TCP Source max-threads property must specify
an integer value. {}",
+ context.getString(ConfigConstants.MAX_THREADS));
+ }
+
+ try {
+ maxConnections = context.getInteger(CONNECTIONS,
DEFAULT_MAX_CONNECTIONS);
+ } catch (NumberFormatException e) {
+ logger.warn("BaseSource\'s \"connections\" property must specify
an integer value.",
+ context.getString(CONNECTIONS));
+ }
+
+ msgFactoryName = context.getString(ConfigConstants.MSG_FACTORY_NAME,
+ "org.apache.inlong.audit.source.ServerMessageFactory");
+ msgFactoryName = msgFactoryName.trim();
+ Preconditions
+ .checkArgument(StringUtils.isNotBlank(msgFactoryName),
"msgFactoryName is empty");
+
+ serviceDecoderName =
context.getString(ConfigConstants.SERVICE_PROCESSOR_NAME,
+ "org.apache.inlong.audit.source.DefaultServiceDecoder");
+ serviceDecoderName = serviceDecoderName.trim();
+ Preconditions.checkArgument(StringUtils.isNotBlank(serviceDecoderName),
+ "serviceProcessorName is empty");
+
+ messageHandlerName =
context.getString(ConfigConstants.MESSAGE_HANDLER_NAME,
+ "org.apache.inlong.audit.source.ServerMessageHandler");
+ messageHandlerName = messageHandlerName.trim();
+ Preconditions.checkArgument(StringUtils.isNotBlank(messageHandlerName),
+ "messageHandlerName is empty");
+
+ maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH,
MAX_MSG_LENGTH);
+ Preconditions.checkArgument(
+ (maxMsgLength >= MIN_MSG_LENGTH && maxMsgLength <=
ConfigConstants.MSG_MAX_LENGTH_BYTES),
+ "maxMsgLength must be >= 4 and <= " +
ConfigConstants.MSG_MAX_LENGTH_BYTES);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/FailoverChannelProcessorHolder.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/FailoverChannelProcessorHolder.java
new file mode 100644
index 0000000..6b10477
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/FailoverChannelProcessorHolder.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import org.apache.flume.channel.ChannelProcessor;
+
+public class FailoverChannelProcessorHolder {
+ private static ChannelProcessor channelProcessor;
+
+ public static ChannelProcessor getChannelProcessor() {
+ return channelProcessor;
+ }
+
+ public static void setChannelProcessor(ChannelProcessor cp) {
+ channelProcessor = cp;
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/LogCounter.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/LogCounter.java
new file mode 100644
index 0000000..7257bb7
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/LogCounter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import java.time.Instant;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LogCounter {
+
+ private AtomicInteger counter = new AtomicInteger(0);
+
+ private int start = 10;
+ private int control = 1000;
+ private int reset = 60 * 1000;
+
+ private long lastLogTime = System.currentTimeMillis();
+
+ public LogCounter(int start, int control, int reset) {
+ this.start = start;
+ this.control = control;
+ this.reset = reset;
+ }
+
+ public boolean shouldPrint() {
+ long currentMills = Instant.now().toEpochMilli();
+ if (currentMills - lastLogTime > reset) {
+ counter.set(0);
+ this.lastLogTime = currentMills;
+ }
+
+ if (counter.incrementAndGet() > start && counter.get() % control != 0)
{
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git
a/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/NetworkUtils.java
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/NetworkUtils.java
new file mode 100644
index 0000000..98e5d69
--- /dev/null
+++
b/inlong-audit/audit-source/src/main/java/org/apache/inlong/audit/utils/NetworkUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.utils;
+
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+
+public class NetworkUtils {
+ public static String INNER_NETWORK_INTERFACE = "eth1";
+
+ private static String localIp = null;
+
+ static {
+ localIp = getLocalIp();
+ }
+
+ /**
+ * get local ip
+ * @return
+ */
+ public static String getLocalIp() {
+ if (localIp == null) {
+ String ip = null;
+ Enumeration<NetworkInterface> allInterface;
+ try {
+ allInterface = NetworkInterface.getNetworkInterfaces();
+ for (; allInterface.hasMoreElements(); ) {
+ NetworkInterface oneInterface = allInterface.nextElement();
+ String interfaceName = oneInterface.getName();
+ if (oneInterface.isLoopback()
+ || !oneInterface.isUp()
+ || !interfaceName
+ .equalsIgnoreCase(INNER_NETWORK_INTERFACE)) {
+ continue;
+ }
+
+ Enumeration<InetAddress> allAddress = oneInterface
+ .getInetAddresses();
+ for (; allAddress.hasMoreElements(); ) {
+ InetAddress oneAddress = allAddress.nextElement();
+ ip = oneAddress.getHostAddress();
+ if (ip == null || ip.isEmpty() ||
ip.equals("127.0.0.1")) {
+ continue;
+ }
+ return ip;
+ }
+ }
+ } catch (SocketException e1) {
+ // ignore it
+ }
+
+ ip = "0.0.0.0";
+ localIp = ip;
+ }
+
+ return localIp;
+ }
+}
diff --git
a/inlong-audit/audit-source/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
b/inlong-audit/audit-source/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
new file mode 100644
index 0000000..f492572
--- /dev/null
+++
b/inlong-audit/audit-source/src/test/java/org/apache/inlong/audit/sink/PulsarSinkTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.sink;
+
+import com.google.common.base.Charsets;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class PulsarSinkTest {
+ private static final Logger logger = LoggerFactory
+ .getLogger(PulsarSinkTest.class);
+ private static final String hostname = "127.0.0.1";
+ private static final Integer port = 1234;
+ private String zkStr = "127.0.0.1:2181";
+ private String zkRoot = "/meta";
+ private int batchSize = 1;
+
+ private PulsarSink sink;
+ private Channel channel;
+
+ @BeforeClass
+ public void setUp() {
+ sink = new PulsarSink();
+ channel = new MemoryChannel();
+
+ Context context = new Context();
+
+ context.put("type", "org.apache.inlong.dataproxy.sink.PulsarSink");
+ context.put("pulsar_server_url", "pulsar://127.0.0.1:6650");
+
+ sink.setChannel(channel);
+
+ Configurables.configure(sink, context);
+ Configurables.configure(channel, context);
+ }
+
+ @Test
+ public void testProcess() throws InterruptedException,
EventDeliveryException,
+ InstantiationException, IllegalAccessException {
+ setUp();
+ Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
+ sink.start();
+ Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+ LifecycleState.START_OR_ERROR, 5000));
+
+ Transaction transaction = channel.getTransaction();
+
+ transaction.begin();
+ for (int i = 0; i < 10; i++) {
+ channel.put(event);
+ }
+ transaction.commit();
+ transaction.close();
+
+ for (int i = 0; i < 5; i++) {
+ Sink.Status status = sink.process();
+ Assert.assertEquals(Sink.Status.READY, status);
+ }
+
+ sink.stop();
+ Assert.assertTrue(LifecycleController.waitForOneOf(sink,
+ LifecycleState.STOP_OR_ERROR, 5000));
+ }
+
+}