http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-ng/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
 
b/rocketmq-flume-ng/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
new file mode 100644
index 0000000..eafc771
--- /dev/null
+++ 
b/rocketmq-flume-ng/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.sink;
+
+/**
+ *
+ */
+public class RocketMQSinkConstants {
+
+    public static final String NAME_SERVER_CONFIG = "nameserver";
+
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String TOPIC_DEFAULT = "FLUME_TOPIC";
+
+    public static final String TAG_CONFIG = "tag";
+    public static final String TAG_DEFAULT = "FLUME_TAG";
+
+    public static final String PRODUCER_GROUP_CONFIG = "producerGroup";
+    public static final String PRODUCER_GROUP_DEFAULT = "FLUME_PRODUCER_GROUP";
+
+    public static final String BATCH_SIZE_CONFIG = "batchSize";
+    public static final int BATCH_SIZE_DEFAULT = 1;
+
+    public static final String MAX_PROCESS_TIME_CONFIG = "maxProcessTime";
+    public static final long MAX_PROCESS_TIME_DEFAULT = 1000;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-ng/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
 
b/rocketmq-flume-ng/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
new file mode 100644
index 0000000..2497dfe
--- /dev/null
+++ 
b/rocketmq-flume-ng/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.sink;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import java.io.UnsupportedEncodingException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.Sink;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+
+import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.BATCH_SIZE_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.NAME_SERVER_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_DEFAULT;
+import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TOPIC_DEFAULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ */
+public class RocketMQSinkTest {
+
+    private static final Logger log = 
org.slf4j.LoggerFactory.getLogger(RocketMQSinkTest.class);
+
+    private static String nameServer = "localhost:9876";
+
+    private static NamesrvController namesrvController;
+    private static BrokerController brokerController;
+
+    private DefaultMQPullConsumer consumer;
+    private String tag = TAG_DEFAULT + "_SINK_TEST_" + new 
Random().nextInt(99);
+    private String consumerGroup = "CONSUMER_GROUP_SINK_TEST";
+    private int batchSize = 100;
+
+    @BeforeClass
+    public static void startMQ() throws Exception {
+
+        /*
+        start nameserver
+         */
+        startNamesrv();
+
+        /*
+        start broker
+         */
+        startBroker();
+
+        Thread.sleep(2000);
+    }
+
+    private static void startNamesrv() throws Exception {
+
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(9876);
+
+        namesrvController = new NamesrvController(namesrvConfig, 
nettyServerConfig);
+        boolean initResult = namesrvController.initialize();
+        if (!initResult) {
+            namesrvController.shutdown();
+            throw new Exception();
+        }
+        namesrvController.start();
+    }
+
+    private static void startBroker() throws Exception {
+
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setNamesrvAddr(nameServer);
+        brokerConfig.setBrokerId(MixAll.MASTER_ID);
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(10911);
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+        brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, messageStoreConfig);
+        boolean initResult = brokerController.initialize();
+        if (!initResult) {
+            brokerController.shutdown();
+            throw new Exception();
+        }
+        brokerController.start();
+    }
+
+    @Test
+    public void testEvent() throws MQClientException, InterruptedException, 
EventDeliveryException, RemotingException, MQBrokerException, 
UnsupportedEncodingException {
+
+        /*
+        start sink
+         */
+        Context context = new Context();
+        context.put(NAME_SERVER_CONFIG, nameServer);
+        context.put(TAG_CONFIG, tag);
+        RocketMQSink sink = new RocketMQSink();
+        Configurables.configure(sink, context);
+        MemoryChannel channel = new MemoryChannel();
+        Configurables.configure(channel, context);
+        sink.setChannel(channel);
+        sink.start();
+
+        /*
+        mock flume source
+         */
+        String sendMsg = "\"Hello RocketMQ\"" + "," + 
DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss");
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        Event event = EventBuilder.withBody(sendMsg.getBytes(), null);
+        channel.put(event);
+        tx.commit();
+        tx.close();
+        log.info("publish message : {}", sendMsg);
+        Sink.Status status = sink.process();
+        if (status == Sink.Status.BACKOFF) {
+            fail("Error");
+        }
+
+        sink.stop();
+
+        /*
+        consumer message
+         */
+        consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setNamesrvAddr(nameServer);
+        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
+        consumer.registerMessageQueueListener(TOPIC_DEFAULT, null);
+        consumer.start();
+
+        String receiveMsg = null;
+        Set<MessageQueue> queues = 
consumer.fetchSubscribeMessageQueues(TOPIC_DEFAULT);
+        for (MessageQueue queue : queues) {
+            long offset = getMessageQueueOffset(queue);
+            PullResult pullResult = consumer.pull(queue, tag, offset, 32);
+
+            if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                for (MessageExt message : pullResult.getMsgFoundList()) {
+                    byte[] body = message.getBody();
+                    receiveMsg = new String(body, "UTF-8");
+                    log.info("receive message : {}", receiveMsg);
+                }
+
+                long nextBeginOffset = pullResult.getNextBeginOffset();
+                putMessageQueueOffset(queue, nextBeginOffset);
+            }
+        }
+        /*
+        wait for processQueueTable init
+         */
+        Thread.sleep(1000);
+
+        consumer.shutdown();
+
+        assertEquals(sendMsg, receiveMsg);
+    }
+
+    @Test
+    public void testBatchEvent() throws MQClientException, 
InterruptedException, EventDeliveryException, RemotingException, 
MQBrokerException, UnsupportedEncodingException {
+
+        /*
+        start sink
+         */
+        Context context = new Context();
+        context.put(NAME_SERVER_CONFIG, nameServer);
+        context.put(TAG_CONFIG, tag);
+        context.put(BATCH_SIZE_CONFIG, String.valueOf(batchSize));
+        RocketMQSink sink = new RocketMQSink();
+        Configurables.configure(sink, context);
+        MemoryChannel channel = new MemoryChannel();
+        Configurables.configure(channel, context);
+        sink.setChannel(channel);
+        sink.start();
+
+        /*
+        mock flume source
+         */
+        Map<String, String> msgs = new HashMap<>();
+
+        Transaction tx = channel.getTransaction();
+        tx.begin();
+        int sendNum = 0;
+        for (int i = 0; i < batchSize; i++) {
+            String sendMsg = "\"Hello RocketMQ\"" + "," + 
DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss:SSSS");
+            Event event = EventBuilder.withBody(sendMsg.getBytes(), null);
+            channel.put(event);
+            log.info("publish message : {}", sendMsg);
+            String[] sendMsgKv = sendMsg.split(",");
+            msgs.put(sendMsgKv[1], sendMsgKv[0]);
+            sendNum++;
+            Thread.sleep(10);
+        }
+        log.info("send message num={}", sendNum);
+
+        tx.commit();
+        tx.close();
+        Sink.Status status = sink.process();
+        if (status == Sink.Status.BACKOFF) {
+            fail("Error");
+        }
+
+        sink.stop();
+
+        /*
+        consumer message
+         */
+        consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setNamesrvAddr(nameServer);
+        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
+        consumer.registerMessageQueueListener(TOPIC_DEFAULT, null);
+        consumer.start();
+
+        int receiveNum = 0;
+        String receiveMsg = null;
+        Set<MessageQueue> queues = 
consumer.fetchSubscribeMessageQueues(TOPIC_DEFAULT);
+        for (MessageQueue queue : queues) {
+            long offset = getMessageQueueOffset(queue);
+            PullResult pullResult = consumer.pull(queue, tag, offset, 
batchSize);
+
+            if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                for (MessageExt message : pullResult.getMsgFoundList()) {
+                    byte[] body = message.getBody();
+                    receiveMsg = new String(body, "UTF-8");
+                    String[] receiveMsgKv = receiveMsg.split(",");
+                    msgs.remove(receiveMsgKv[1]);
+                    log.info("receive message : {}", receiveMsg);
+                    receiveNum++;
+                }
+                long nextBeginOffset = pullResult.getNextBeginOffset();
+                putMessageQueueOffset(queue, nextBeginOffset);
+            }
+        }
+        log.info("receive message num={}", receiveNum);
+
+        /*
+        wait for processQueueTable init
+         */
+        Thread.sleep(1000);
+
+        consumer.shutdown();
+
+        assertEquals(msgs.size(), 0);
+    }
+
+    @Test
+    public void testNullEvent() throws MQClientException, 
InterruptedException, EventDeliveryException, RemotingException, 
MQBrokerException, UnsupportedEncodingException {
+
+        /*
+        start sink
+         */
+        Context context = new Context();
+        context.put(NAME_SERVER_CONFIG, nameServer);
+        context.put(TAG_CONFIG, tag);
+        RocketMQSink sink = new RocketMQSink();
+        Configurables.configure(sink, context);
+        MemoryChannel channel = new MemoryChannel();
+        Configurables.configure(channel, context);
+        sink.setChannel(channel);
+        sink.start();
+
+        Sink.Status status = sink.process();
+
+        assertEquals(status, Sink.Status.BACKOFF);
+
+        sink.stop();
+    }
+
+    private long getMessageQueueOffset(MessageQueue queue) throws 
MQClientException {
+
+        long offset = consumer.fetchConsumeOffset(queue, false);
+        if (offset < 0) {
+            offset = 0;
+        }
+
+        return offset;
+    }
+
+    private void putMessageQueueOffset(MessageQueue queue, long offset) throws 
MQClientException {
+        consumer.updateConsumeOffset(queue, offset);
+    }
+
+    @AfterClass
+    public static void stop() {
+
+        if (brokerController != null) {
+            brokerController.shutdown();
+        }
+
+        if (namesrvController != null) {
+            namesrvController.shutdown();
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/rocketmq-flume-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-ng/rocketmq-flume-sink/src/test/resources/log4j.properties 
b/rocketmq-flume-ng/rocketmq-flume-sink/src/test/resources/log4j.properties
new file mode 100644
index 0000000..dd15190
--- /dev/null
+++ b/rocketmq-flume-ng/rocketmq-flume-sink/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootLogger = DEBUG, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.rocketmq = DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/rocketmq-flume-souce/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-flume-ng/rocketmq-flume-souce/pom.xml 
b/rocketmq-flume-ng/rocketmq-flume-souce/pom.xml
new file mode 100644
index 0000000..d07312b
--- /dev/null
+++ b/rocketmq-flume-ng/rocketmq-flume-souce/pom.xml
@@ -0,0 +1,27 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>rocketmq-flume</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+    <artifactId>rocketmq-flume-souce</artifactId>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSource.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-ng/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSource.java
 
b/rocketmq-flume-ng/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSource.java
new file mode 100644
index 0000000..9860c7d
--- /dev/null
+++ 
b/rocketmq-flume-ng/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSource.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.souce;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.instrumentation.SourceCounter;
+import org.apache.flume.source.AbstractPollableSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.BATCH_SIZE_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.BATCH_SIZE_DEFAULT;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.CONSUMER_GROUP_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.CONSUMER_GROUP_DEFAULT;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.HEADER_TAG_NAME;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.HEADER_TOPIC_NAME;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.MESSAGE_MODEL_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.MESSAGE_MODEL_DEFAULT;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.NAME_SERVER_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_DEFAULT;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TOPIC_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TOPIC_DEFAULT;
+
+/**
+ *
+ */
+public class RocketMQSource extends AbstractPollableSource implements 
Configurable {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSource.class);
+
+    private String nameServer;
+    private String topic;
+    private String tag;
+    private String consumerGroup;
+    private String messageModel;
+    private Integer batchSize;
+
+    /** Monitoring counter. */
+    private SourceCounter sourceCounter;
+
+    DefaultMQPullConsumer consumer;
+
+    @Override protected void doConfigure(Context context) throws 
FlumeException {
+
+        nameServer = context.getString(NAME_SERVER_CONFIG);
+        if (nameServer == null) {
+            throw new ConfigurationException("NameServer must not be null");
+        }
+
+        topic = context.getString(TOPIC_CONFIG, TOPIC_DEFAULT);
+        tag = context.getString(TAG_CONFIG, TAG_DEFAULT);
+        consumerGroup = context.getString(CONSUMER_GROUP_CONFIG, 
CONSUMER_GROUP_DEFAULT);
+        messageModel = context.getString(MESSAGE_MODEL_CONFIG, 
MESSAGE_MODEL_DEFAULT);
+        batchSize = context.getInteger(BATCH_SIZE_CONFIG, BATCH_SIZE_DEFAULT);
+
+        if (sourceCounter == null) {
+            sourceCounter = new SourceCounter(getName());
+        }
+    }
+
+    @Override
+    protected void doStart() throws FlumeException {
+
+        consumer = new DefaultMQPullConsumer(consumerGroup);
+        consumer.setNamesrvAddr(nameServer);
+        consumer.setMessageModel(MessageModel.valueOf(messageModel));
+        consumer.registerMessageQueueListener(topic, null);
+
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            log.error("RocketMQ consumer start failed", e);
+            throw new FlumeException("Failed to start RocketMQ consumer", e);
+        }
+
+        sourceCounter.start();
+    }
+
+    @Override
+    protected Status doProcess() throws EventDeliveryException {
+
+        List<Event> events = new ArrayList<>();
+        Map<MessageQueue, Long> offsets = new HashMap<>();
+        Event event;
+        Map<String, String> headers;
+
+        try {
+            Set<MessageQueue> queues = 
consumer.fetchSubscribeMessageQueues(topic);
+            for (MessageQueue queue : queues) {
+                long offset = getMessageQueueOffset(queue);
+                PullResult pullResult = consumer.pull(queue, tag, offset, 
batchSize);
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Pull from queueId:{}, offset:{}, 
pullResult:{}", queue.getQueueId(), offset, pullResult);
+                }
+
+                if (pullResult.getPullStatus() == PullStatus.FOUND) {
+                    for (MessageExt msg : pullResult.getMsgFoundList()) {
+                        byte[] body = msg.getBody();
+
+                        headers = new HashMap<>();
+                        headers.put(HEADER_TOPIC_NAME, topic);
+                        headers.put(HEADER_TAG_NAME, tag);
+                        if (log.isDebugEnabled()) {
+                            log.debug("Processing message,body={}", new 
String(body, "UTF-8"));
+                        }
+
+                        event = EventBuilder.withBody(body, headers);
+                        events.add(event);
+                    }
+                    offsets.put(queue, pullResult.getNextBeginOffset());
+                }
+            }
+
+            if (events.size() > 0) {
+                sourceCounter.incrementAppendBatchReceivedCount();
+                sourceCounter.addToEventReceivedCount(events.size());
+
+                getChannelProcessor().processEventBatch(events);
+
+                sourceCounter.incrementAppendBatchAcceptedCount();
+                sourceCounter.addToEventAcceptedCount(events.size());
+
+                events.clear();
+            }
+
+            for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
+                putMessageQueueOffset(entry.getKey(), entry.getValue());
+            }
+
+        } catch (Exception e) {
+            log.error("Failed to consumer message", e);
+            return Status.BACKOFF;
+        }
+
+        return Status.READY;
+    }
+
+    @Override
+    protected void doStop() throws FlumeException {
+        sourceCounter.stop();
+
+        consumer.shutdown();
+    }
+
+    private long getMessageQueueOffset(MessageQueue queue) throws 
MQClientException {
+        long offset = consumer.fetchConsumeOffset(queue, false);
+        if (offset < 0) {
+            offset = 0;
+        }
+
+        return offset;
+    }
+
+    private void putMessageQueueOffset(MessageQueue queue, long offset) throws 
MQClientException {
+        consumer.updateConsumeOffset(queue, offset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceConstants.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-ng/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceConstants.java
 
b/rocketmq-flume-ng/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceConstants.java
new file mode 100644
index 0000000..48a7b02
--- /dev/null
+++ 
b/rocketmq-flume-ng/rocketmq-flume-souce/src/main/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.souce;
+
+/**
+ *
+ */
+public class RocketMQSourceConstants {
+
+    public static final String NAME_SERVER_CONFIG = "nameserver";
+
+    public static final String TOPIC_CONFIG = "topic";
+    public static final String TOPIC_DEFAULT = "FLUME_TOPIC";
+
+    public static final String TAG_CONFIG = "tag";
+    public static final String TAG_DEFAULT = "FLUME_TAG";
+
+    public static final String CONSUMER_GROUP_CONFIG = "consumerGroup";
+    public static final String CONSUMER_GROUP_DEFAULT = "FLUME_CONSUMER_GROUP";
+
+    public static final String MESSAGE_MODEL_CONFIG = "messageModel";
+    public static final String MESSAGE_MODEL_DEFAULT = "BROADCASTING";
+
+    public static final String BATCH_SIZE_CONFIG = "batchSize";
+    public static final int BATCH_SIZE_DEFAULT = 32;
+
+    public static final String HEADER_TOPIC_NAME = "topic";
+    public static final String HEADER_TAG_NAME = "tag";
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/rocketmq-flume-souce/src/test/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-ng/rocketmq-flume-souce/src/test/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceTest.java
 
b/rocketmq-flume-ng/rocketmq-flume-souce/src/test/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceTest.java
new file mode 100644
index 0000000..eb05c2b
--- /dev/null
+++ 
b/rocketmq-flume-ng/rocketmq-flume-souce/src/test/java/org/apache/rocketmq/flume/ng/souce/RocketMQSourceTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flume.ng.souce;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MQVersion;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import org.apache.commons.lang.time.DateFormatUtils;
+import org.apache.flume.Channel;
+import org.apache.flume.ChannelSelector;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.PollableSource;
+import org.apache.flume.Transaction;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.NAME_SERVER_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_CONFIG;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TAG_DEFAULT;
+import static 
org.apache.rocketmq.flume.ng.souce.RocketMQSourceConstants.TOPIC_DEFAULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ *
+ */
+public class RocketMQSourceTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSourceTest.class);
+
+    private static String nameServer = "localhost:9876";
+
+    private static NamesrvController namesrvController;
+    private static BrokerController brokerController;
+
+    private String tag = TAG_DEFAULT + "_SOURCE_TEST_" + new 
Random().nextInt(99);
+    private String producerGroup = "PRODUCER_GROUP_SOURCE_TEST";
+
+    @BeforeClass
+    public static void startMQ() throws Exception {
+
+        /*
+        start nameserver
+         */
+        startNamesrv();
+
+        /*
+        start broker
+         */
+        startBroker();
+
+        Thread.sleep(2000);
+    }
+
+    private static void startNamesrv() throws Exception {
+
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(9876);
+
+        namesrvController = new NamesrvController(namesrvConfig, 
nettyServerConfig);
+        boolean initResult = namesrvController.initialize();
+        if (!initResult) {
+            namesrvController.shutdown();
+            throw new Exception();
+        }
+        namesrvController.start();
+    }
+
+    private static void startBroker() throws Exception {
+
+        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
+
+        BrokerConfig brokerConfig = new BrokerConfig();
+        brokerConfig.setNamesrvAddr(nameServer);
+        brokerConfig.setBrokerId(MixAll.MASTER_ID);
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        nettyServerConfig.setListenPort(10911);
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
+        brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, messageStoreConfig);
+        boolean initResult = brokerController.initialize();
+        if (!initResult) {
+            brokerController.shutdown();
+            throw new Exception();
+        }
+        brokerController.start();
+    }
+
+    @Test
+    public void testEvent() throws EventDeliveryException, MQBrokerException, 
MQClientException, InterruptedException, UnsupportedEncodingException {
+
+        // publish test message
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+        producer.setNamesrvAddr(nameServer);
+
+        String sendMsg = "\"Hello Flume\"" + "," + DateFormatUtils.format(new 
Date(), "yyyy-MM-DD hh:mm:ss");
+
+        try {
+            producer.start();
+
+            Message msg = new Message(TOPIC_DEFAULT, tag, 
sendMsg.getBytes("UTF-8"));
+            SendResult sendResult = producer.send(msg);
+            log.info("publish message : {}, sendResult:{}", sendMsg, 
sendResult);
+        } catch (Exception e) {
+            throw new MQClientException("Failed to publish messages", e);
+        } finally {
+            producer.shutdown();
+        }
+
+        // start source
+        Context context = new Context();
+        context.put(NAME_SERVER_CONFIG, nameServer);
+        context.put(TAG_CONFIG, tag);
+        Channel channel = new MemoryChannel();
+        Configurables.configure(channel, context);
+        List<Channel> channels = new ArrayList<>();
+        channels.add(channel);
+        ChannelSelector channelSelector = new ReplicatingChannelSelector();
+        channelSelector.setChannels(channels);
+        ChannelProcessor channelProcessor = new 
ChannelProcessor(channelSelector);
+
+        RocketMQSource source = new RocketMQSource();
+        source.setChannelProcessor(channelProcessor);
+        Configurables.configure(source, context);
+        source.start();
+        PollableSource.Status status = source.process();
+        if (status == PollableSource.Status.BACKOFF) {
+            fail("Error");
+        }
+        /*
+        wait for processQueueTable init
+         */
+        Thread.sleep(1000);
+
+        source.stop();
+
+        /*
+        mock flume sink
+         */
+        Transaction transaction = channel.getTransaction();
+        transaction.begin();
+        Event event = channel.take();
+        if (event == null) {
+            transaction.commit();
+            fail("Error");
+        }
+        byte[] body = event.getBody();
+        String receiveMsg = new String(body, "UTF-8");
+        log.info("receive message : {}", receiveMsg);
+
+        assertEquals(sendMsg, receiveMsg);
+    }
+
+    @AfterClass
+    public static void stop() {
+
+        if (brokerController != null) {
+            brokerController.shutdown();
+        }
+
+        if (namesrvController != null) {
+            namesrvController.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/rocketmq-flume-souce/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-ng/rocketmq-flume-souce/src/test/resources/log4j.properties 
b/rocketmq-flume-ng/rocketmq-flume-souce/src/test/resources/log4j.properties
new file mode 100644
index 0000000..dd15190
--- /dev/null
+++ b/rocketmq-flume-ng/rocketmq-flume-souce/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+log4j.rootLogger = DEBUG, out
+
+log4j.appender.out = org.apache.log4j.ConsoleAppender
+log4j.appender.out.layout = org.apache.log4j.PatternLayout
+log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
+
+log4j.logger.org.apache.rocketmq = DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/style/copyright/Apache.xml
----------------------------------------------------------------------
diff --git a/rocketmq-flume-ng/style/copyright/Apache.xml 
b/rocketmq-flume-ng/style/copyright/Apache.xml
new file mode 100644
index 0000000..7ca71b5
--- /dev/null
+++ b/rocketmq-flume-ng/style/copyright/Apache.xml
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <copyright>
+        <option name="myName" value="Apache"/>
+        <option name="notice"
+                value="Licensed to the Apache Software Foundation (ASF) under 
one or more&#10;contributor license agreements.  See the NOTICE file 
distributed with&#10;this work for additional information regarding copyright 
ownership.&#10;The ASF licenses this file to You under the Apache License, 
Version 2.0&#10;(the &quot;License&quot;); you may not use this file except in 
compliance with&#10;the License.  You may obtain a copy of the License 
at&#10;&#10;    http://www.apache.org/licenses/LICENSE-2.0&#10;&#10;Unless 
required by applicable law or agreed to in writing, software&#10;distributed 
under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;WITHOUT 
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;See the 
License for the specific language governing permissions and&#10;limitations 
under the License."/>
+    </copyright>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/style/copyright/profiles_settings.xml
----------------------------------------------------------------------
diff --git a/rocketmq-flume-ng/style/copyright/profiles_settings.xml 
b/rocketmq-flume-ng/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..d326b8c
--- /dev/null
+++ b/rocketmq-flume-ng/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <settings default="Apache">
+        <module2copyright>
+            <element module="All" copyright="Apache"/>
+        </module2copyright>
+        <LanguageOptions name="GSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="HTML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JAVA">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="addBlankAfter" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSPX">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="MXML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="Properties">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="SPI">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="XML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="__TEMPLATE__">
+            <option name="separateBefore" value="true"/>
+            <option name="lenBefore" value="1"/>
+        </LanguageOptions>
+    </settings>
+</component>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/style/rmq_checkstyle.xml
----------------------------------------------------------------------
diff --git a/rocketmq-flume-ng/style/rmq_checkstyle.xml 
b/rocketmq-flume-ng/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..776b305
--- /dev/null
+++ b/rocketmq-flume-ng/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd";>
+<!--Refer 
http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding
 -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software 
Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println 
in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in 
javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  
value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override 
hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds 
code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example 
the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup 
switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" 
value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to 
producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch 
parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * 
notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="WhitespaceAfter"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-ng/style/rmq_codeStyle.xml
----------------------------------------------------------------------
diff --git a/rocketmq-flume-ng/style/rmq_codeStyle.xml 
b/rocketmq-flume-ng/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..c727f67
--- /dev/null
+++ b/rocketmq-flume-ng/style/rmq_codeStyle.xml
@@ -0,0 +1,143 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<code_scheme name="rocketmq">
+    <option name="USE_SAME_INDENTS" value="true"/>
+    <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+    <option name="OTHER_INDENT_OPTIONS">
+        <value>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+            <option name="USE_TAB_CHARACTER" value="false"/>
+            <option name="SMART_TABS" value="false"/>
+            <option name="LABEL_INDENT_SIZE" value="0"/>
+            <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+            <option name="USE_RELATIVE_INDENTS" value="false"/>
+        </value>
+    </option>
+    <option name="PREFER_LONGER_NAMES" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="true"/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+    <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+    <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+    <option name="WHILE_ON_NEW_LINE" value="true"/>
+    <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+    <option name="ALIGN_MULTILINE_FOR" value="false"/>
+    <option name="SPACE_AFTER_TYPE_CAST" value="true"/>
+    <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+    <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+    <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+    <option name="LABELED_STATEMENT_WRAP" value="1"/>
+    <option name="WRAP_COMMENTS" value="true"/>
+    <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+    <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+    <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+    <JavaCodeStyleSettings>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+        <option name="INDENT_SIZE" value="2"/>
+    </ADDITIONAL_INDENT_OPTIONS>
+    <codeStyleSettings language="Groovy">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="HOCON">
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="JAVA">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+        <option name="LABELED_STATEMENT_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="JSON">
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="Scala">
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="XML">
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-sink/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-flume-sink/pom.xml b/rocketmq-flume-sink/pom.xml
deleted file mode 100644
index 9cbe073..0000000
--- a/rocketmq-flume-sink/pom.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache</groupId>
-        <artifactId>rocketmq-flume</artifactId>
-        <version>0.0.1-SNAPSHOT</version>
-    </parent>
-    <artifactId>rocketmq-flume-sink</artifactId>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSink.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSink.java
 
b/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSink.java
deleted file mode 100644
index 6a04f48..0000000
--- 
a/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSink.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flume.ng.sink;
-
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.client.producer.DefaultMQProducer;
-import org.apache.rocketmq.client.producer.SendCallback;
-import org.apache.rocketmq.client.producer.SendResult;
-import org.apache.rocketmq.common.message.Message;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurable;
-import org.apache.flume.conf.ConfigurationException;
-import org.apache.flume.instrumentation.SinkCounter;
-import org.apache.flume.sink.AbstractSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.BATCH_SIZE_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.BATCH_SIZE_DEFAULT;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.MAX_PROCESS_TIME_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.MAX_PROCESS_TIME_DEFAULT;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.NAME_SERVER_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.PRODUCER_GROUP_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.PRODUCER_GROUP_DEFAULT;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_DEFAULT;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TOPIC_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TOPIC_DEFAULT;
-
-/**
- *
- */
-public class RocketMQSink extends AbstractSink implements Configurable {
-
-    private static final Logger log = 
LoggerFactory.getLogger(RocketMQSink.class);
-
-    private String nameServer;
-    private String topic;
-    private String tag;
-    private String producerGroup;
-    private int batchSize;
-    private long maxProcessTime;
-
-    /** Monitoring counter. */
-    private SinkCounter sinkCounter;
-
-    private DefaultMQProducer producer;
-
-    @Override
-    public void configure(Context context) {
-
-        nameServer = context.getString(NAME_SERVER_CONFIG);
-        if (nameServer == null) {
-            throw new ConfigurationException("NameServer must not be null");
-        }
-
-        topic = context.getString(TOPIC_CONFIG, TOPIC_DEFAULT);
-        tag = context.getString(TAG_CONFIG, TAG_DEFAULT);
-        producerGroup = context.getString(PRODUCER_GROUP_CONFIG, 
PRODUCER_GROUP_DEFAULT);
-        batchSize = context.getInteger(BATCH_SIZE_CONFIG, BATCH_SIZE_DEFAULT);
-        maxProcessTime = context.getLong(MAX_PROCESS_TIME_CONFIG, 
MAX_PROCESS_TIME_DEFAULT);
-
-        if (sinkCounter == null) {
-            sinkCounter = new SinkCounter(getName());
-        }
-    }
-
-    @Override
-    public synchronized void start() {
-
-        producer = new DefaultMQProducer(producerGroup);
-        producer.setNamesrvAddr(nameServer);
-        try {
-            producer.start();
-        } catch (MQClientException e) {
-            sinkCounter.incrementConnectionFailedCount();
-
-            log.error("RocketMQ producer start failed", e);
-            throw new FlumeException("Failed to start RocketMQ producer", e);
-        }
-
-        sinkCounter.incrementConnectionCreatedCount();
-        sinkCounter.start();
-
-        super.start();
-    }
-
-    @Override
-    public Status process() throws EventDeliveryException {
-
-        Channel channel = getChannel();
-        Transaction transaction = null;
-
-        try {
-            transaction = channel.getTransaction();
-            transaction.begin();
-
-            /*
-            batch take
-             */
-            List<Event> events = new ArrayList<>();
-            long beginTime = System.currentTimeMillis();
-            while (true) {
-                Event event = channel.take();
-                if (event != null) {
-                    events.add(event);
-                }
-
-                if (events.size() == batchSize
-                    || System.currentTimeMillis() - beginTime > 
maxProcessTime) {
-                    break;
-                }
-            }
-
-            if (events.size() == 0) {
-                sinkCounter.incrementBatchEmptyCount();
-
-                transaction.rollback();
-                return Status.BACKOFF;
-            }
-            /*
-            async send
-             */
-            CountDownLatch latch = new CountDownLatch(events.size());
-            AtomicInteger errorNum = new AtomicInteger();
-
-            for (Event event : events) {
-                byte[] body = event.getBody();
-                Message message = new Message(topic, tag, body);
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Processing event,body={}", new String(body, 
"UTF-8"));
-                }
-                producer.send(message, new SendCallBackHandler(message, latch, 
errorNum));
-            }
-            latch.await();
-
-            sinkCounter.addToEventDrainAttemptCount(events.size());
-
-            if (errorNum.get() > 0) {
-                log.error("errorNum=" + errorNum + ",transaction will 
rollback");
-                transaction.rollback();
-                return Status.BACKOFF;
-            } else {
-                transaction.commit();
-
-                sinkCounter.addToEventDrainSuccessCount(events.size());
-
-                return Status.READY;
-            }
-
-        } catch (Exception e) {
-            log.error("Failed to processing event", e);
-
-            if (transaction != null) {
-                try {
-                    transaction.rollback();
-                } catch (Exception ex) {
-                    log.error("Failed to rollback transaction", ex);
-                    throw new EventDeliveryException("Failed to rollback 
transaction", ex);
-                }
-            }
-
-            return Status.BACKOFF;
-
-        } finally {
-            if (transaction != null) {
-                transaction.close();
-            }
-        }
-    }
-
-    @Override public synchronized void stop() {
-        producer.shutdown();
-
-        sinkCounter.incrementConnectionClosedCount();
-        sinkCounter.stop();
-
-        super.stop();
-    }
-
-    public class SendCallBackHandler implements SendCallback {
-
-        private final Message message;
-        private final CountDownLatch latch;
-        private final AtomicInteger errorNum;
-
-        SendCallBackHandler(Message message, CountDownLatch latch, 
AtomicInteger errorNum) {
-            this.message = message;
-            this.latch = latch;
-            this.errorNum = errorNum;
-        }
-
-        @Override
-        public void onSuccess(SendResult sendResult) {
-
-            latch.countDown();
-
-            if (log.isDebugEnabled()) {
-                try {
-                    log.debug("Sent event,body={},sendResult={}", new 
String(message.getBody(), "UTF-8"), sendResult);
-                } catch (UnsupportedEncodingException e) {
-                    log.error("Encoding error", e);
-                }
-            }
-        }
-
-        @Override
-        public void onException(Throwable e) {
-            latch.countDown();
-            errorNum.incrementAndGet();
-
-            try {
-                log.error("Message publish failed,body=" + new 
String(message.getBody(), "UTF-8"), e);
-            } catch (UnsupportedEncodingException e1) {
-                log.error("Encoding error", e);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
 
b/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
deleted file mode 100644
index eafc771..0000000
--- 
a/rocketmq-flume-sink/src/main/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkConstants.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flume.ng.sink;
-
-/**
- *
- */
-public class RocketMQSinkConstants {
-
-    public static final String NAME_SERVER_CONFIG = "nameserver";
-
-    public static final String TOPIC_CONFIG = "topic";
-    public static final String TOPIC_DEFAULT = "FLUME_TOPIC";
-
-    public static final String TAG_CONFIG = "tag";
-    public static final String TAG_DEFAULT = "FLUME_TAG";
-
-    public static final String PRODUCER_GROUP_CONFIG = "producerGroup";
-    public static final String PRODUCER_GROUP_DEFAULT = "FLUME_PRODUCER_GROUP";
-
-    public static final String BATCH_SIZE_CONFIG = "batchSize";
-    public static final int BATCH_SIZE_DEFAULT = 1;
-
-    public static final String MAX_PROCESS_TIME_CONFIG = "maxProcessTime";
-    public static final long MAX_PROCESS_TIME_DEFAULT = 1000;
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
 
b/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
deleted file mode 100644
index 2497dfe..0000000
--- 
a/rocketmq-flume-sink/src/test/java/org/apache/rocketmq/flume/ng/sink/RocketMQSinkTest.java
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.flume.ng.sink;
-
-import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
-import org.apache.rocketmq.client.consumer.PullResult;
-import org.apache.rocketmq.client.consumer.PullStatus;
-import org.apache.rocketmq.client.exception.MQBrokerException;
-import org.apache.rocketmq.client.exception.MQClientException;
-import org.apache.rocketmq.common.BrokerConfig;
-import org.apache.rocketmq.common.MQVersion;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.namesrv.NamesrvConfig;
-import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
-import org.apache.rocketmq.namesrv.NamesrvController;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import java.io.UnsupportedEncodingException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.commons.lang.time.DateFormatUtils;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.Sink;
-import org.apache.flume.Transaction;
-import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.rocketmq.remoting.netty.NettyClientConfig;
-import org.apache.rocketmq.remoting.netty.NettyServerConfig;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.BATCH_SIZE_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.NAME_SERVER_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_CONFIG;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TAG_DEFAULT;
-import static 
org.apache.rocketmq.flume.ng.sink.RocketMQSinkConstants.TOPIC_DEFAULT;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-/**
- *
- */
-public class RocketMQSinkTest {
-
-    private static final Logger log = 
org.slf4j.LoggerFactory.getLogger(RocketMQSinkTest.class);
-
-    private static String nameServer = "localhost:9876";
-
-    private static NamesrvController namesrvController;
-    private static BrokerController brokerController;
-
-    private DefaultMQPullConsumer consumer;
-    private String tag = TAG_DEFAULT + "_SINK_TEST_" + new 
Random().nextInt(99);
-    private String consumerGroup = "CONSUMER_GROUP_SINK_TEST";
-    private int batchSize = 100;
-
-    @BeforeClass
-    public static void startMQ() throws Exception {
-
-        /*
-        start nameserver
-         */
-        startNamesrv();
-
-        /*
-        start broker
-         */
-        startBroker();
-
-        Thread.sleep(2000);
-    }
-
-    private static void startNamesrv() throws Exception {
-
-        NamesrvConfig namesrvConfig = new NamesrvConfig();
-        NettyServerConfig nettyServerConfig = new NettyServerConfig();
-        nettyServerConfig.setListenPort(9876);
-
-        namesrvController = new NamesrvController(namesrvConfig, 
nettyServerConfig);
-        boolean initResult = namesrvController.initialize();
-        if (!initResult) {
-            namesrvController.shutdown();
-            throw new Exception();
-        }
-        namesrvController.start();
-    }
-
-    private static void startBroker() throws Exception {
-
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
-
-        BrokerConfig brokerConfig = new BrokerConfig();
-        brokerConfig.setNamesrvAddr(nameServer);
-        brokerConfig.setBrokerId(MixAll.MASTER_ID);
-        NettyServerConfig nettyServerConfig = new NettyServerConfig();
-        nettyServerConfig.setListenPort(10911);
-        NettyClientConfig nettyClientConfig = new NettyClientConfig();
-        MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
-
-        brokerController = new BrokerController(brokerConfig, 
nettyServerConfig, nettyClientConfig, messageStoreConfig);
-        boolean initResult = brokerController.initialize();
-        if (!initResult) {
-            brokerController.shutdown();
-            throw new Exception();
-        }
-        brokerController.start();
-    }
-
-    @Test
-    public void testEvent() throws MQClientException, InterruptedException, 
EventDeliveryException, RemotingException, MQBrokerException, 
UnsupportedEncodingException {
-
-        /*
-        start sink
-         */
-        Context context = new Context();
-        context.put(NAME_SERVER_CONFIG, nameServer);
-        context.put(TAG_CONFIG, tag);
-        RocketMQSink sink = new RocketMQSink();
-        Configurables.configure(sink, context);
-        MemoryChannel channel = new MemoryChannel();
-        Configurables.configure(channel, context);
-        sink.setChannel(channel);
-        sink.start();
-
-        /*
-        mock flume source
-         */
-        String sendMsg = "\"Hello RocketMQ\"" + "," + 
DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss");
-        Transaction tx = channel.getTransaction();
-        tx.begin();
-        Event event = EventBuilder.withBody(sendMsg.getBytes(), null);
-        channel.put(event);
-        tx.commit();
-        tx.close();
-        log.info("publish message : {}", sendMsg);
-        Sink.Status status = sink.process();
-        if (status == Sink.Status.BACKOFF) {
-            fail("Error");
-        }
-
-        sink.stop();
-
-        /*
-        consumer message
-         */
-        consumer = new DefaultMQPullConsumer(consumerGroup);
-        consumer.setNamesrvAddr(nameServer);
-        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
-        consumer.registerMessageQueueListener(TOPIC_DEFAULT, null);
-        consumer.start();
-
-        String receiveMsg = null;
-        Set<MessageQueue> queues = 
consumer.fetchSubscribeMessageQueues(TOPIC_DEFAULT);
-        for (MessageQueue queue : queues) {
-            long offset = getMessageQueueOffset(queue);
-            PullResult pullResult = consumer.pull(queue, tag, offset, 32);
-
-            if (pullResult.getPullStatus() == PullStatus.FOUND) {
-                for (MessageExt message : pullResult.getMsgFoundList()) {
-                    byte[] body = message.getBody();
-                    receiveMsg = new String(body, "UTF-8");
-                    log.info("receive message : {}", receiveMsg);
-                }
-
-                long nextBeginOffset = pullResult.getNextBeginOffset();
-                putMessageQueueOffset(queue, nextBeginOffset);
-            }
-        }
-        /*
-        wait for processQueueTable init
-         */
-        Thread.sleep(1000);
-
-        consumer.shutdown();
-
-        assertEquals(sendMsg, receiveMsg);
-    }
-
-    @Test
-    public void testBatchEvent() throws MQClientException, 
InterruptedException, EventDeliveryException, RemotingException, 
MQBrokerException, UnsupportedEncodingException {
-
-        /*
-        start sink
-         */
-        Context context = new Context();
-        context.put(NAME_SERVER_CONFIG, nameServer);
-        context.put(TAG_CONFIG, tag);
-        context.put(BATCH_SIZE_CONFIG, String.valueOf(batchSize));
-        RocketMQSink sink = new RocketMQSink();
-        Configurables.configure(sink, context);
-        MemoryChannel channel = new MemoryChannel();
-        Configurables.configure(channel, context);
-        sink.setChannel(channel);
-        sink.start();
-
-        /*
-        mock flume source
-         */
-        Map<String, String> msgs = new HashMap<>();
-
-        Transaction tx = channel.getTransaction();
-        tx.begin();
-        int sendNum = 0;
-        for (int i = 0; i < batchSize; i++) {
-            String sendMsg = "\"Hello RocketMQ\"" + "," + 
DateFormatUtils.format(new Date(), "yyyy-MM-DD hh:mm:ss:SSSS");
-            Event event = EventBuilder.withBody(sendMsg.getBytes(), null);
-            channel.put(event);
-            log.info("publish message : {}", sendMsg);
-            String[] sendMsgKv = sendMsg.split(",");
-            msgs.put(sendMsgKv[1], sendMsgKv[0]);
-            sendNum++;
-            Thread.sleep(10);
-        }
-        log.info("send message num={}", sendNum);
-
-        tx.commit();
-        tx.close();
-        Sink.Status status = sink.process();
-        if (status == Sink.Status.BACKOFF) {
-            fail("Error");
-        }
-
-        sink.stop();
-
-        /*
-        consumer message
-         */
-        consumer = new DefaultMQPullConsumer(consumerGroup);
-        consumer.setNamesrvAddr(nameServer);
-        consumer.setMessageModel(MessageModel.valueOf("BROADCASTING"));
-        consumer.registerMessageQueueListener(TOPIC_DEFAULT, null);
-        consumer.start();
-
-        int receiveNum = 0;
-        String receiveMsg = null;
-        Set<MessageQueue> queues = 
consumer.fetchSubscribeMessageQueues(TOPIC_DEFAULT);
-        for (MessageQueue queue : queues) {
-            long offset = getMessageQueueOffset(queue);
-            PullResult pullResult = consumer.pull(queue, tag, offset, 
batchSize);
-
-            if (pullResult.getPullStatus() == PullStatus.FOUND) {
-                for (MessageExt message : pullResult.getMsgFoundList()) {
-                    byte[] body = message.getBody();
-                    receiveMsg = new String(body, "UTF-8");
-                    String[] receiveMsgKv = receiveMsg.split(",");
-                    msgs.remove(receiveMsgKv[1]);
-                    log.info("receive message : {}", receiveMsg);
-                    receiveNum++;
-                }
-                long nextBeginOffset = pullResult.getNextBeginOffset();
-                putMessageQueueOffset(queue, nextBeginOffset);
-            }
-        }
-        log.info("receive message num={}", receiveNum);
-
-        /*
-        wait for processQueueTable init
-         */
-        Thread.sleep(1000);
-
-        consumer.shutdown();
-
-        assertEquals(msgs.size(), 0);
-    }
-
-    @Test
-    public void testNullEvent() throws MQClientException, 
InterruptedException, EventDeliveryException, RemotingException, 
MQBrokerException, UnsupportedEncodingException {
-
-        /*
-        start sink
-         */
-        Context context = new Context();
-        context.put(NAME_SERVER_CONFIG, nameServer);
-        context.put(TAG_CONFIG, tag);
-        RocketMQSink sink = new RocketMQSink();
-        Configurables.configure(sink, context);
-        MemoryChannel channel = new MemoryChannel();
-        Configurables.configure(channel, context);
-        sink.setChannel(channel);
-        sink.start();
-
-        Sink.Status status = sink.process();
-
-        assertEquals(status, Sink.Status.BACKOFF);
-
-        sink.stop();
-    }
-
-    private long getMessageQueueOffset(MessageQueue queue) throws 
MQClientException {
-
-        long offset = consumer.fetchConsumeOffset(queue, false);
-        if (offset < 0) {
-            offset = 0;
-        }
-
-        return offset;
-    }
-
-    private void putMessageQueueOffset(MessageQueue queue, long offset) throws 
MQClientException {
-        consumer.updateConsumeOffset(queue, offset);
-    }
-
-    @AfterClass
-    public static void stop() {
-
-        if (brokerController != null) {
-            brokerController.shutdown();
-        }
-
-        if (namesrvController != null) {
-            namesrvController.shutdown();
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-sink/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/rocketmq-flume-sink/src/test/resources/log4j.properties 
b/rocketmq-flume-sink/src/test/resources/log4j.properties
deleted file mode 100644
index dd15190..0000000
--- a/rocketmq-flume-sink/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-log4j.rootLogger = DEBUG, out
-
-log4j.appender.out = org.apache.log4j.ConsoleAppender
-log4j.appender.out.layout = org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern = %d (%t) [%p - %l] %m%n
-
-log4j.logger.org.apache.rocketmq = DEBUG

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/e2148280/rocketmq-flume-souce/pom.xml
----------------------------------------------------------------------
diff --git a/rocketmq-flume-souce/pom.xml b/rocketmq-flume-souce/pom.xml
deleted file mode 100644
index d07312b..0000000
--- a/rocketmq-flume-souce/pom.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache</groupId>
-        <artifactId>rocketmq-flume</artifactId>
-        <version>0.0.1-SNAPSHOT</version>
-    </parent>
-    <artifactId>rocketmq-flume-souce</artifactId>
-</project>


Reply via email to