This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/master by this push:
     new 578d1bb  [TUBEMQ-163] Flume sink for TubeMQ (#146)
578d1bb is described below

commit 578d1bb46bafe052698528ce9bb59f82b3b44b67
Author: Yuanbo Liu <liuyuanb...@gmail.com>
AuthorDate: Tue Jun 16 15:17:12 2020 +0800

    [TUBEMQ-163] Flume sink for TubeMQ (#146)
---
 tubemq-connectors/pom.xml                          |   1 +
 tubemq-connectors/tubemq-connector-flume/README.md |  21 +
 .../{ => tubemq-connector-flume}/pom.xml           |  58 +--
 .../apache/flume/sink/tubemq/ConfigOptions.java    |  65 ++++
 .../org/apache/flume/sink/tubemq/EventStat.java    |  63 +++
 .../apache/flume/sink/tubemq/TubeSinkCounter.java  |  54 +++
 .../org/apache/flume/sink/tubemq/TubemqSink.java   | 424 +++++++++++++++++++++
 .../apache/flume/sink/tubemq/TestTubemqSink.java   | 110 ++++++
 .../src/test/resources/log4j.properties            |  20 +
 9 files changed, 787 insertions(+), 29 deletions(-)

diff --git a/tubemq-connectors/pom.xml b/tubemq-connectors/pom.xml
index d3e8cd9..56a4c0e 100644
--- a/tubemq-connectors/pom.xml
+++ b/tubemq-connectors/pom.xml
@@ -30,6 +30,7 @@
     <packaging>pom</packaging>
     <modules>
         <module>tubemq-connector-flink</module>
+        <module>tubemq-connector-flume</module>
     </modules>
 
     <dependencies>
diff --git a/tubemq-connectors/tubemq-connector-flume/README.md 
b/tubemq-connectors/tubemq-connector-flume/README.md
new file mode 100644
index 0000000..f7780f5
--- /dev/null
+++ b/tubemq-connectors/tubemq-connector-flume/README.md
@@ -0,0 +1,21 @@
+### TubeMQ Flume Connector
+#### Prerequisites
+
+Copy the following files to flume library path.
+
+```
+tubemq-connector-flume-[TUBEMQ-VERSION].jar
+tubemq-client-[TUBEMQ-VERSION].jar
+tubemq-core-[TUBEMQ-VERSION].jar
+```
+
+#### Flume Sink Configuration Template 
+
+```
+agent.sinks = tubemq
+agent.sinks.tubemq.type = org.apache.flume.sink.tubemq.TubemqSink
+// master host addresses, it could separate with ",".
+agent.sinks.tubemq.master-host-port-list = 127.0.0.1:8000
+// the default topic name, it could be override by topic in envent header.
+agent.sinks.tubemq.topic = demo
+```
\ No newline at end of file
diff --git a/tubemq-connectors/pom.xml 
b/tubemq-connectors/tubemq-connector-flume/pom.xml
similarity index 54%
copy from tubemq-connectors/pom.xml
copy to tubemq-connectors/tubemq-connector-flume/pom.xml
index d3e8cd9..f719501 100644
--- a/tubemq-connectors/pom.xml
+++ b/tubemq-connectors/tubemq-connector-flume/pom.xml
@@ -20,44 +20,44 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <artifactId>tubemq</artifactId>
+        <artifactId>tubemq-connectors</artifactId>
         <groupId>org.apache.tubemq</groupId>
         <version>0.5.0-incubating-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <name>Apache TubeMQ - Connectors</name>
-    <artifactId>tubemq-connectors</artifactId>
-    <packaging>pom</packaging>
-    <modules>
-        <module>tubemq-connector-flink</module>
-    </modules>
+    <name>Apache TubeMQ - Connectors-flume</name>
+    <artifactId>tubemq-connector-flume</artifactId>
+
+    <properties>
+        <flume.version>1.9.0</flume.version>
+        <mockito.version>1.9.0</mockito.version>
+        <junit.version>4.10</junit.version>
+        <awaitility.version>4.0.3</awaitility.version>
+    </properties>
 
     <dependencies>
         <dependency>
-            <groupId>org.apache.tubemq</groupId>
-            <artifactId>tubemq-client</artifactId>
+            <groupId>org.apache.flume</groupId>
+            <artifactId>flume-ng-core</artifactId>
+            <version>${flume.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.tubemq</groupId>
-            <artifactId>tubemq-core</artifactId>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <version>${awaitility.version}</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>3.1</version>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                    <encoding>UTF-8</encoding>
-                    <showDeprecation>true</showDeprecation>
-                    <showWarnings>true</showWarnings>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
 </project>
\ No newline at end of file
diff --git 
a/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/ConfigOptions.java
 
b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/ConfigOptions.java
new file mode 100644
index 0000000..7b3fcf4
--- /dev/null
+++ 
b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/ConfigOptions.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.tubemq;
+
+/**
+ * Flume configuration options for tubemq sink
+ */
+public class ConfigOptions {
+
+    // master host of tube mq cluster
+    public static final String MASTER_HOST_PORT_LIST = "master-host-port-list";
+
+    // topic name
+    public static final String TOPIC = "topic";
+
+    public static final String HEARTBEAT_PERIOD = "heartbeat-period"; // in 
milliseconds
+    public static final long DEFAULT_HEARTBEAT_PERIOD = 15000L;
+
+    public static final String RPC_TIMEOUT = "rpc-timeout";
+    public static final long DEFAULT_RPC_TIMEOUT = 20000L;
+
+    public static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 
"link-max-allowed-delayed-msg-count";
+    public static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 
80000L;
+
+    public static final String SESSION_WARN_DELAYED_MSG_COUNT = 
"session-warn-delayed-msg-count";
+    public static final long DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT = 2000000L;
+
+    public static final String SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 
"session-max-allowed-delayed-msg-count";
+    public static final long DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT = 
4000000L;
+
+    public static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 
"netty-write-buffer-high-water-mark";
+    public static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 
1024 * 1024L;
+
+    public static final String SINK_THREAD_NUM = "thread-num";
+    public static final int DEFAULT_SINK_THREAD_NUM = 4;
+
+    public static final String RETRY_QUEUE_CAPACITY = "retry-queue-capacity";
+    public static final int DEFAULT_RETRY_QUEUE_CAPACITY = 10000;
+
+    public static final String EVENT_QUEUE_CAPACITY = "retry-queue-capacity";
+    public static final int DEFAULT_EVENT_QUEUE_CAPACITY = 1000;
+
+    public static final String EVENT_MAX_RETRY_TIME = "event-max-retry-time";
+    public static final int DEFAULT_EVENT_MAX_RETRY_TIME = 5;
+
+    public static final String EVENT_OFFER_TIMEOUT = "event-offer-timeout";
+    public static final long DEFAULT_EVENT_OFFER_TIMEOUT = 3 * 1000; // in 
milliseconds
+
+}
diff --git 
a/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/EventStat.java
 
b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/EventStat.java
new file mode 100644
index 0000000..5c6f0fd
--- /dev/null
+++ 
b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/EventStat.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.tubemq;
+import static org.apache.flume.sink.tubemq.ConfigOptions.TOPIC;
+
+import java.util.Map;
+import org.apache.flume.Event;
+
+
+
+/**
+ * Event with retry time
+ */
+public class EventStat {
+    private final Event event;
+    private int retryCnt;
+    private String topic;
+
+    public EventStat(Event event) {
+        this.event = event;
+        this.retryCnt = 0;
+        Map<String, String> headers = event.getHeaders();
+        if (headers != null && headers.containsKey(TOPIC)) {
+            this.topic = headers.get(TOPIC);
+        }
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String defaultTopic) {
+        this.topic = defaultTopic;
+    }
+
+    public Event getEvent() {
+        return event;
+    }
+
+    public int getRetryCnt() {
+        return retryCnt;
+    }
+
+    public void incRetryCnt() {
+        this.retryCnt++;
+    }
+}
diff --git 
a/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubeSinkCounter.java
 
b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubeSinkCounter.java
new file mode 100644
index 0000000..a4704f9
--- /dev/null
+++ 
b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubeSinkCounter.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.tubemq;
+
+import org.apache.flume.instrumentation.SinkCounter;
+
+/**
+ * Counters for tube sink
+ */
+public class TubeSinkCounter extends SinkCounter {
+    private static final String COUNT_ROLLBACK =
+            "sink.rollback.count";
+
+    private static final String COUNT_SEND =
+            "sink.send.count";
+
+    private static final String[] ATTRIBUTES = { COUNT_ROLLBACK, COUNT_SEND };
+
+    public TubeSinkCounter(String name) {
+        super(name, ATTRIBUTES);
+    }
+
+    public long incrementRollbackCount() {
+        return increment(COUNT_ROLLBACK);
+    }
+
+    public long incrementSendCount() {
+        return increment(COUNT_SEND);
+    }
+
+    public long getTubeRollbackCount() {
+        return get(COUNT_ROLLBACK);
+    }
+
+    public long getTubeSendCount() {
+        return get(COUNT_SEND);
+    }
+}
diff --git 
a/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubemqSink.java
 
b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubemqSink.java
new file mode 100644
index 0000000..be6cbc7
--- /dev/null
+++ 
b/tubemq-connectors/tubemq-connector-flume/src/main/java/org/apache/flume/sink/tubemq/TubemqSink.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.tubemq;
+
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_EVENT_MAX_RETRY_TIME;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_EVENT_OFFER_TIMEOUT;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_EVENT_QUEUE_CAPACITY;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_HEARTBEAT_PERIOD;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_RETRY_QUEUE_CAPACITY;
+import static org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_RPC_TIMEOUT;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.DEFAULT_SINK_THREAD_NUM;
+import static org.apache.flume.sink.tubemq.ConfigOptions.EVENT_MAX_RETRY_TIME;
+import static org.apache.flume.sink.tubemq.ConfigOptions.EVENT_OFFER_TIMEOUT;
+import static org.apache.flume.sink.tubemq.ConfigOptions.EVENT_QUEUE_CAPACITY;
+import static org.apache.flume.sink.tubemq.ConfigOptions.HEARTBEAT_PERIOD;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.LINK_MAX_ALLOWED_DELAYED_MSG_COUNT;
+import static org.apache.flume.sink.tubemq.ConfigOptions.MASTER_HOST_PORT_LIST;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.NETTY_WRITE_BUFFER_HIGH_WATER_MARK;
+import static org.apache.flume.sink.tubemq.ConfigOptions.RETRY_QUEUE_CAPACITY;
+import static org.apache.flume.sink.tubemq.ConfigOptions.RPC_TIMEOUT;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT;
+import static 
org.apache.flume.sink.tubemq.ConfigOptions.SESSION_WARN_DELAYED_MSG_COUNT;
+import static org.apache.flume.sink.tubemq.ConfigOptions.SINK_THREAD_NUM;
+import static org.apache.flume.sink.tubemq.ConfigOptions.TOPIC;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.tubemq.client.config.TubeClientConfig;
+import org.apache.tubemq.client.exception.TubeClientException;
+import org.apache.tubemq.client.factory.TubeMultiSessionFactory;
+import org.apache.tubemq.client.producer.MessageProducer;
+import org.apache.tubemq.client.producer.MessageSentCallback;
+import org.apache.tubemq.client.producer.MessageSentResult;
+import org.apache.tubemq.corebase.Message;
+import org.apache.tubemq.corerpc.exception.OverflowException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Make tubemq as one of flume sinks
+ */
+public class TubemqSink extends AbstractSink implements Configurable {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(TubemqSink.class);
+
+    public TubeMultiSessionFactory sessionFactory;
+    public ConcurrentHashMap<String, MessageProducer> producerMap;
+
+    private String masterHostAndPortList;
+    private String defaultTopic;
+    private long heartbeatPeriod;
+    private long rpcTimeout;
+
+    private long linkMaxAllowedDelayedMsgCount;
+    private long sessionWarnDelayedMsgCount;
+    private long sessionMaxAllowedDelayedMsgCount;
+    private long nettyWriteBufferHighWaterMark;
+
+    private ExecutorService sinkThreadPool;
+    private final List<Future<?>> threadFutures = new ArrayList<>();
+    private int threadNum;
+
+    private boolean started = false;
+    // check if overflow
+    private boolean overflow = false;
+
+    private LinkedBlockingQueue<EventStat> resendQueue;
+
+    private LinkedBlockingQueue<Event> eventQueue;
+
+    private int maxRetryTime;
+    private long eventOfferTimeout;
+
+    private TubeClientConfig clientConfig;
+
+    private TubeSinkCounter counter;
+
+    /**
+     * init tube config
+     *
+     * @return tube config
+     */
+    private TubeClientConfig initTubeConfig() {
+        final TubeClientConfig tubeClientConfig = new 
TubeClientConfig(this.masterHostAndPortList);
+        
tubeClientConfig.setLinkMaxAllowedDelayedMsgCount(linkMaxAllowedDelayedMsgCount);
+        
tubeClientConfig.setSessionWarnDelayedMsgCount(sessionWarnDelayedMsgCount);
+        
tubeClientConfig.setSessionMaxAllowedDelayedMsgCount(sessionMaxAllowedDelayedMsgCount);
+        
tubeClientConfig.setNettyWriteBufferHighWaterMark(nettyWriteBufferHighWaterMark);
+        tubeClientConfig.setHeartbeatPeriodMs(heartbeatPeriod);
+        tubeClientConfig.setRpcTimeoutMs(rpcTimeout);
+
+        return tubeClientConfig;
+    }
+
+    @VisibleForTesting
+    TubeClientConfig getClientConfig() {
+        return clientConfig;
+    }
+
+    @VisibleForTesting
+    TubeSinkCounter getCounter() {
+        return counter;
+    }
+
+    /**
+     * Create producer
+     *
+     * @throws FlumeException
+     */
+    private void createConnection() throws FlumeException {
+        if (sessionFactory != null) {
+            return;
+        }
+        try {
+            sessionFactory = new TubeMultiSessionFactory(clientConfig);
+        } catch (TubeClientException e) {
+            LOGGER.error("create connection error in tubemqSink, "
+                    + "maybe tubemq master set error, please re-check. ex1 
{}", e.getMessage());
+            throw new FlumeException("connect to tubemq error1, please 
re-check", e);
+        }
+
+        if (producerMap == null) {
+            producerMap = new ConcurrentHashMap<>();
+        }
+
+    }
+
+    /**
+     * Destroy all producers and clear up caches.
+     */
+    private void destroyConnection() {
+        for (String topic : producerMap.keySet()) {
+            MessageProducer producer = producerMap.get(topic);
+            try {
+                producer.shutdown();
+            } catch (Throwable e) {
+                LOGGER.error("destroy producer error in tubemqSink, ex", e);
+            }
+        }
+        producerMap.clear();
+
+        if (sessionFactory != null) {
+            try {
+                sessionFactory.shutdown();
+            } catch (Exception e) {
+                LOGGER.error("destroy sessionFactory error in tubemqSink, 
MetaClientException", e);
+            }
+        }
+        sessionFactory = null;
+        LOGGER.debug("closed meta producer");
+    }
+
+    @Override
+    public void start() {
+        LOGGER.info("tubemq sink starting...");
+
+        // create connection
+        try {
+            createConnection();
+        } catch (FlumeException e) {
+            // close connection
+            destroyConnection();
+            LOGGER.error("Unable to create tubemq client" + ". Exception 
follows.", e);
+        }
+        started = true;
+        // submit worker threads
+        for (int i = 0; i < threadNum; i++) {
+            threadFutures.add(sinkThreadPool.submit(new SinkTask()));
+        }
+        super.start();
+    }
+
+    @Override
+    public void stop() {
+        LOGGER.info("tubemq sink stopping");
+        started = false;
+        if (sinkThreadPool != null) {
+            sinkThreadPool.shutdown();
+        }
+        for (Future<?> future : threadFutures) {
+            // interrupt threads
+            future.cancel(true);
+        }
+        threadFutures.clear();
+        destroyConnection();
+        super.stop();
+    }
+
+    @Override
+    public Status process() {
+        if (!started) {
+            return Status.BACKOFF;
+        }
+        Channel channel = getChannel();
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        Status status = Status.READY;
+        try {
+            Event event = channel.take();
+            if (event != null) {
+                if (!eventQueue.offer(event, eventOfferTimeout, 
TimeUnit.MILLISECONDS)) {
+                    LOGGER.info("[{}] Channel --> Queue(has no enough 
space,current code point) --> tubemq, Check " +
+                            "if tubemq server or network is ok.(if this 
situation last long time it will cause" +
+                            " memoryChannel full and fileChannel write.)", 
getName());
+                    counter.incrementRollbackCount();
+                    tx.rollback();
+                } else {
+                    tx.commit();
+                }
+            } else {
+                // if event is null, that means queue is empty, backoff it.
+                status = Status.BACKOFF;
+                tx.commit();
+            }
+        } catch (Throwable t) {
+            LOGGER.error("Process event failed!" + this.getName(), t);
+            try {
+                counter.incrementRollbackCount();
+                tx.rollback();
+            } catch (Throwable e) {
+                LOGGER.error("tubemq sink transaction rollback exception", e);
+            }
+        } finally {
+            tx.close();
+        }
+        return status;
+    }
+
+    @Override
+    public void configure(Context context) {
+        LOGGER.info(context.toString());
+        masterHostAndPortList = context.getString(MASTER_HOST_PORT_LIST);
+        defaultTopic = context.getString(TOPIC);
+        heartbeatPeriod = context.getLong(HEARTBEAT_PERIOD, 
DEFAULT_HEARTBEAT_PERIOD);
+        rpcTimeout = context.getLong(RPC_TIMEOUT, DEFAULT_RPC_TIMEOUT);
+
+        linkMaxAllowedDelayedMsgCount = context.getLong(
+                LINK_MAX_ALLOWED_DELAYED_MSG_COUNT,
+                DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT);
+        sessionWarnDelayedMsgCount = context.getLong(
+                SESSION_WARN_DELAYED_MSG_COUNT,
+                DEFAULT_SESSION_WARN_DELAYED_MSG_COUNT);
+        sessionMaxAllowedDelayedMsgCount = context.getLong(
+                SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT,
+                DEFAULT_SESSION_MAX_ALLOWED_DELAYED_MSG_COUNT);
+        nettyWriteBufferHighWaterMark = context.getLong(
+                NETTY_WRITE_BUFFER_HIGH_WATER_MARK,
+                DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK);
+
+        producerMap = new ConcurrentHashMap<>();
+
+        threadNum = context.getInteger(SINK_THREAD_NUM, 
DEFAULT_SINK_THREAD_NUM);
+        sinkThreadPool = Executors.newFixedThreadPool(threadNum);
+
+        int retryQueueCapacity = context.getInteger(RETRY_QUEUE_CAPACITY, 
DEFAULT_RETRY_QUEUE_CAPACITY);
+        resendQueue = new LinkedBlockingQueue<>(retryQueueCapacity);
+
+        int eventQueueCapacity = context.getInteger(EVENT_QUEUE_CAPACITY, 
DEFAULT_EVENT_QUEUE_CAPACITY);
+        eventQueue = new LinkedBlockingQueue<>(eventQueueCapacity);
+
+        maxRetryTime = context.getInteger(EVENT_MAX_RETRY_TIME, 
DEFAULT_EVENT_MAX_RETRY_TIME);
+        eventOfferTimeout = context.getLong(EVENT_OFFER_TIMEOUT, 
DEFAULT_EVENT_OFFER_TIMEOUT);
+
+        counter = new TubeSinkCounter(this.getName());
+
+        clientConfig = initTubeConfig();
+    }
+
+    /**
+     * Get producer from cache, create it if not exists.
+     *
+     * @param topic - topic name
+     * @return MessageProducer
+     * @throws TubeClientException
+     */
+    private MessageProducer getProducer(String topic) throws 
TubeClientException {
+        if (!producerMap.containsKey(topic)) {
+            MessageProducer producer = sessionFactory.createProducer();
+            // publish topic
+            producer.publish(topic);
+            producerMap.putIfAbsent(topic, producer);
+        }
+        return producerMap.get(topic);
+    }
+
+
+    class SinkTask implements Runnable {
+
+        private void sleepIfOverflow() throws Exception {
+            if (overflow) {
+                overflow = false;
+                Thread.sleep(50);
+            }
+        }
+
+        /**
+         * fetch message, wait if queue is empty
+         *
+         * @return EventStat
+         * @throws Exception
+         */
+        private EventStat fetchEventStat() throws Exception {
+            EventStat es = null;
+            if (!resendQueue.isEmpty()) {
+                es = resendQueue.poll();
+            } else {
+                // wait if eventQueue is empty
+                Event event = eventQueue.take();
+                es = new EventStat(event);
+            }
+            return es;
+        }
+
+        private void sendEvent(MessageProducer producer, EventStat es) throws 
Exception {
+            // send message with callback
+            Message message = new Message(es.getTopic(), 
es.getEvent().getBody());
+            producer.sendMessage(message, new MessageSentCallback() {
+                @Override
+                public void onMessageSent(MessageSentResult result) {
+                    if (!result.isSuccess()) {
+                        resendEvent(es);
+                    }
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    LOGGER.error("exception caught", e);
+                    if (e instanceof OverflowException) {
+                        overflow = true;
+                    }
+                    resendEvent(es);
+                }
+            });
+        }
+
+        /**
+         * Resent event
+         *
+         * @param es EventStat
+         */
+        private void resendEvent(EventStat es) {
+            if (es == null || es.getEvent() == null) {
+                return;
+            }
+            es.incRetryCnt();
+            if (es.getRetryCnt() > maxRetryTime) {
+                LOGGER.error("event max retry reached, ignore it");
+                return;
+            }
+
+            // if resend queue is full, send back to channel
+            if (!resendQueue.offer(es)) {
+                getChannel().put(es.getEvent());
+                LOGGER.warn("resend queue is full, size: {}, send back to 
channel", resendQueue.size());
+            }
+        }
+
+        @Override
+        public void run() {
+            LOGGER.info("Sink task {} started.", 
Thread.currentThread().getName());
+            while (started) {
+                boolean decrementFlag = false;
+                EventStat es = null;
+                try {
+                    sleepIfOverflow();
+                    // fetch event, wait if necessary
+                    es = fetchEventStat();
+                    if (es.getTopic() == null || es.getTopic().equals("")) {
+                        LOGGER.debug("no topic specified in event header, use 
default topic instead");
+                        es.setTopic(defaultTopic);
+                    }
+                    counter.incrementSendCount();
+                    MessageProducer producer;
+                    try {
+                        producer = getProducer(es.getTopic());
+                        sendEvent(producer, es);
+                    } catch (Exception e) {
+                        LOGGER.error("Get producer failed!", e);
+                    }
+                } catch (InterruptedException e) {
+                    LOGGER.info("Thread {} has been interrupted!", 
Thread.currentThread().getName());
+                    return;
+                } catch (Throwable t) {
+                    LOGGER.error("error while sending event", t);
+                    resendEvent(es);
+                }
+            }
+        }
+    }
+}
diff --git 
a/tubemq-connectors/tubemq-connector-flume/src/test/java/org/apache/flume/sink/tubemq/TestTubemqSink.java
 
b/tubemq-connectors/tubemq-connector-flume/src/test/java/org/apache/flume/sink/tubemq/TestTubemqSink.java
new file mode 100644
index 0000000..4de1203
--- /dev/null
+++ 
b/tubemq-connectors/tubemq-connector-flume/src/test/java/org/apache/flume/sink/tubemq/TestTubemqSink.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.sink.tubemq;
+
+import static org.apache.flume.sink.tubemq.ConfigOptions.MASTER_HOST_PORT_LIST;
+import static org.apache.flume.sink.tubemq.ConfigOptions.TOPIC;
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.tubemq.client.config.TubeClientConfig;
+import org.junit.Test;
+
+
+public class TestTubemqSink {
+
+    private Context prepareDefaultContext() {
+        // Prepares a default context with Kafka Server Properties
+        Context context = new Context();
+        context.put(MASTER_HOST_PORT_LIST, "localhost:9092");
+        return context;
+    }
+
+    @Test
+    public void testTubeProperties() {
+
+        TubemqSink tubemqSink = new TubemqSink();
+        Context context = new Context();
+        context.put(MASTER_HOST_PORT_LIST, "ip1:9092,ip2:9092");
+        Configurables.configure(tubemqSink, context);
+
+        TubeClientConfig config = tubemqSink.getClientConfig();
+
+        //check that we have defaults set
+        for (String host : config.getMasterInfo().getNodeHostPortList()) {
+            if (host.startsWith("ip1")) {
+                assertEquals("ip1:9092", host);
+            } else if (host.startsWith("ip2")) {
+                assertEquals("ip2:9092", host);
+            } else {
+                fail("config should contains host list");
+            }
+        }
+    }
+
+    @Test
+    public void testTubeSink() throws Exception {
+        TubemqSink tubeSink = new TubemqSink();
+        Context context = prepareDefaultContext();
+        Configurables.configure(tubeSink, context);
+        Channel memoryChannel = new MemoryChannel();
+        Configurables.configure(memoryChannel, context);
+        tubeSink.setChannel(memoryChannel);
+        tubeSink.start();
+
+
+        String msg = "default-topic-test";
+        Transaction tx = memoryChannel.getTransaction();
+        tx.begin();
+        Event event = EventBuilder.withBody(msg.getBytes());
+        Map<String, String> eventHeader = new HashMap<>();
+        eventHeader.put(TOPIC, msg);
+        event.setHeaders(eventHeader);
+        memoryChannel.put(event);
+        tx.commit();
+        tx.close();
+
+        try {
+            Sink.Status status = tubeSink.process();
+            if (status == Sink.Status.BACKOFF) {
+                fail("Error Occurred");
+            }
+        } catch (Exception ex) {
+            // ignore
+        }
+
+        await().atMost(20, TimeUnit.SECONDS).until(() -> 
tubeSink.getCounter().getTubeSendCount() == 1);
+        assertEquals(1, tubeSink.getCounter().getTubeSendCount());
+
+        tubeSink.stop();
+    }
+}
diff --git 
a/tubemq-connectors/tubemq-connector-flume/src/test/resources/log4j.properties 
b/tubemq-connectors/tubemq-connector-flume/src/test/resources/log4j.properties
new file mode 100644
index 0000000..81e44d9
--- /dev/null
+++ 
b/tubemq-connectors/tubemq-connector-flume/src/test/resources/log4j.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger = INFO, out
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
\ No newline at end of file

Reply via email to