This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git

commit 12f0d0c710ea91b0767b84f95245f3eb44c29ce8
Author: Xin Wang <[email protected]>
AuthorDate: Fri Mar 23 21:47:33 2018 +0800

    [ROCKETMQ-82] RocketMQ-Flink Integration (#45)
---
 README.md                                          | 147 +++++++++
 pom.xml                                            | 174 +++++++++++
 .../org/apache/rocketmq/flink/RocketMQConfig.java  | 134 +++++++++
 .../org/apache/rocketmq/flink/RocketMQSink.java    | 187 ++++++++++++
 .../org/apache/rocketmq/flink/RocketMQSource.java  | 331 +++++++++++++++++++++
 .../org/apache/rocketmq/flink/RocketMQUtils.java   |  36 +++
 .../org/apache/rocketmq/flink/RunningChecker.java  |  33 ++
 .../common/selector/DefaultTopicSelector.java      |  43 +++
 .../flink/common/selector/SimpleTopicSelector.java |  73 +++++
 .../flink/common/selector/TopicSelector.java       |  28 ++
 .../KeyValueDeserializationSchema.java             |  27 ++
 .../serialization/KeyValueSerializationSchema.java |  28 ++
 .../SimpleKeyValueDeserializationSchema.java       |  66 ++++
 .../SimpleKeyValueSerializationSchema.java         |  63 ++++
 .../apache/rocketmq/flink/RocketMQSinkTest.java    |  75 +++++
 .../apache/rocketmq/flink/RocketMQSourceTest.java  | 121 ++++++++
 .../java/org/apache/rocketmq/flink/TestUtils.java  |  33 ++
 .../common/selector/DefaultTopicSelectorTest.java  |  37 +++
 .../common/selector/SimpleTopicSelectorTest.java   |  49 +++
 .../SimpleKeyValueSerializationSchemaTest.java     |  42 +++
 .../rocketmq/flink/example/ConsumerTest.java       |  54 ++++
 .../rocketmq/flink/example/ProducerTest.java       |  57 ++++
 .../flink/example/RocketMQFlinkExample.java        |  76 +++++
 style/copyright/Apache.xml                         |  24 ++
 style/copyright/profiles_settings.xml              |  64 ++++
 style/rmq_checkstyle.xml                           | 135 +++++++++
 style/rmq_codeStyle.xml                            | 157 ++++++++++
 27 files changed, 2294 insertions(+)

diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ab1d456
--- /dev/null
+++ b/README.md
@@ -0,0 +1,147 @@
+# RocketMQ-Flink
+
+RocketMQ integration for [Apache Flink](https://flink.apache.org/). This 
module includes the RocketMQ source and sink that allows a flink job to either 
write messages into a topic or read from topics in a flink job.
+
+
+## RocketMQSource
+To use the `RocketMQSource`,  you construct an instance of it by specifying a 
KeyValueDeserializationSchema instance and a Properties instance which 
including rocketmq configs.
+`RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props)`
+The RocketMQSource is based on RocketMQ pull consumer mode, and provides 
exactly once reliability guarantees when checkpoints are enabled.
+Otherwise, the source doesn't provide any reliability guarantees.
+
+### KeyValueDeserializationSchema
+The main API for deserializing topic and tags is the 
`org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema` 
interface.
+`rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` 
implementations called `SimpleKeyValueDeserializationSchema`.
+
+```java
+public interface KeyValueDeserializationSchema<T> extends 
ResultTypeQueryable<T>, Serializable {
+    T deserializeKeyAndValue(byte[] key, byte[] value);
+}
+```
+
+## RocketMQSink
+To use the `RocketMQSink`,  you construct an instance of it by specifying 
KeyValueSerializationSchema & TopicSelector instances and a Properties instance 
which including rocketmq configs.
+`RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> 
topicSelector, Properties props)`
+The RocketMQSink provides at-least-once reliability guarantees when 
checkpoints are enabled and `withBatchFlushOnCheckpoint(true)` is set.
+Otherwise, the sink reliability guarantees depends on rocketmq producer's 
retry policy, for this case, the messages sending way is sync by default,
+but you can change it by invoking `withAsync(true)`. 
+
+### KeyValueSerializationSchema
+The main API for serializing topic and tags is the 
`org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema` 
interface.
+`rocketmq-flink` includes general purpose `KeyValueSerializationSchema` 
implementations called `SimpleKeyValueSerializationSchema`.
+
+```java
+public interface KeyValueSerializationSchema<T> extends Serializable {
+
+    byte[] serializeKey(T tuple);
+
+    byte[] serializeValue(T tuple);
+}
+```
+
+### TopicSelector
+The main API for selecting topic and tags is the 
`org.apache.rocketmq.flink.common.selector.TopicSelector` interface.
+`rocketmq-flink` includes general purpose `TopicSelector` implementations 
called `DefaultTopicSelector` and `SimpleTopicSelector`.
+
+```java
+public interface TopicSelector<T> extends Serializable {
+
+    String getTopic(T tuple);
+
+    String getTag(T tuple);
+}
+```
+
+## Examples
+The following is an example which receive messages from RocketMQ brokers and 
send messages to broker after processing.
+
+ ```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // enable checkpoint
+        env.enableCheckpointing(3000);
+
+        Properties consumerProps = new Properties();
+        consumerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, 
"localhost:9876");
+        consumerProps.setProperty(RocketMqConfig.CONSUMER_GROUP, "c002");
+        consumerProps.setProperty(RocketMqConfig.CONSUMER_TOPIC, 
"flink-source2");
+
+        Properties producerProps = new Properties();
+        producerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, 
"localhost:9876");
+
+        env.addSource(new RocketMQSource(new 
SimpleKeyValueDeserializationSchema("id", "address"), consumerProps))
+            .name("rocketmq-source")
+            .setParallelism(2)
+            .process(new ProcessFunction<Map, Map>() {
+                @Override
+                public void processElement(Map in, Context ctx, Collector<Map> 
out) throws Exception {
+                    HashMap result = new HashMap();
+                    result.put("id", in.get("id"));
+                    String[] arr = in.get("address").toString().split("\\s+");
+                    result.put("province", arr[arr.length-1]);
+                    out.collect(result);
+                }
+            })
+            .name("upper-processor")
+            .setParallelism(2)
+            .addSink(new RocketMQSink(new 
SimpleKeyValueSerializationSchema("id", "province"),
+                new DefaultTopicSelector("flink-sink2"), 
producerProps).withBatchFlushOnCheckpoint(true))
+            .name("rocketmq-sink")
+            .setParallelism(2);
+
+        try {
+            env.execute("rocketmq-flink-example");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+ ```
+
+## Configurations
+The following configurations are all from the class 
`org.apache.rocketmq.flink.RocketMQConfig`.
+
+### Producer Configurations
+| NAME        | DESCRIPTION           | DEFAULT  |
+| ------------- |:-------------:|:------:|
+| nameserver.address      | name server address *Required* | null |
+| nameserver.poll.interval      | name server poll topic info interval     |   
30000 |
+| brokerserver.heartbeat.interval | broker server heartbeat interval      |    
30000 |
+| producer.group | producer group      |    `UUID.randomUUID().toString()` |
+| producer.retry.times | producer send messages retry times      |    3 |
+| producer.timeout | producer send messages timeout      |    3000 |
+
+
+### Consumer Configurations
+| NAME        | DESCRIPTION           | DEFAULT  |
+| ------------- |:-------------:|:------:|
+| nameserver.address      | name server address *Required* | null |
+| nameserver.poll.interval      | name server poll topic info interval     |   
30000 |
+| brokerserver.heartbeat.interval | broker server heartbeat interval      |    
30000 |
+| consumer.group | consumer group *Required*     |    null |
+| consumer.topic | consumer topic *Required*       |    null |
+| consumer.tag | consumer topic tag      |    * |
+| consumer.offset.reset.to | what to do when there is no initial offset on the 
server      |   latest/earliest/timestamp |
+| consumer.offset.from.timestamp | the timestamp when 
`consumer.offset.reset.to=timestamp` was set   |   `System.currentTimeMillis()` 
|
+| consumer.offset.persist.interval | auto commit offset interval      |    
5000 |
+| consumer.pull.thread.pool.size | consumer pull thread pool size      |    20 
|
+| consumer.batch.size | consumer messages batch size      |    32 |
+| consumer.delay.when.message.not.found | the delay time when messages were 
not found      |    10 |
+
+
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..ecb3436
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,174 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-flink</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!--maven properties -->
+        <maven.test.skip>false</maven.test.skip>
+        <maven.javadoc.skip>false</maven.javadoc.skip>
+        <!-- compiler settings properties -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <rocketmq.version>4.2.0</rocketmq.version>
+        <flink.version>1.4.0</flink.version>
+        <commons-lang.version>2.5</commons-lang.version>
+        <scala.binary.version>2.11</scala.binary.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-common</artifactId>
+            <version>${rocketmq.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>io.netty</groupId>
+                    <artifactId>netty-tcnative</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>${commons-lang.version}</version>
+        </dependency>
+
+        <!--test -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+            <version>4.12</version>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.5.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.5.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-namesrv</artifactId>
+            <version>${rocketmq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+            <version>${rocketmq.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <encoding>UTF-8</encoding>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.12.4</version>
+                <configuration>
+                    <skipTests>${maven.test.skip}</skipTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <version>0.12</version>
+                <configuration>
+                    <excludes>
+                        <exclude>README.md</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            
<configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                            
<includeTestSourceDirectory>false</includeTestSourceDirectory>
+                            <includeTestResources>false</includeTestResources>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
new file mode 100644
index 0000000..8ec760b
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQConfig.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink;
+
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+
+import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
+
+/**
+ * RocketMQConfig for Consumer/Producer.
+ */
+public class RocketMQConfig {
+    // common
+    public static final String NAME_SERVER_ADDR = "nameserver.address"; // 
Required
+
+    public static final String NAME_SERVER_POLL_INTERVAL = 
"nameserver.poll.interval";
+    public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 
seconds
+
+    public static final String BROKER_HEART_BEAT_INTERVAL = 
"brokerserver.heartbeat.interval";
+    public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 
seconds
+
+
+    // producer
+    public static final String PRODUCER_GROUP = "producer.group";
+
+    public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
+    public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;
+
+    public static final String PRODUCER_TIMEOUT = "producer.timeout";
+    public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
+
+
+    // consumer
+    public static final String CONSUMER_GROUP = "consumer.group"; // Required
+
+    public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
+
+    public static final String CONSUMER_TAG = "consumer.tag";
+    public static final String DEFAULT_CONSUMER_TAG = "*";
+
+    public static final String CONSUMER_OFFSET_RESET_TO = 
"consumer.offset.reset.to";
+    public static final String CONSUMER_OFFSET_LATEST = "latest";
+    public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
+    public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
+    public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = 
"consumer.offset.from.timestamp";
+
+    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = 
"consumer.offset.persist.interval";
+    public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; 
// 5 seconds
+
+    public static final String CONSUMER_PULL_POOL_SIZE = 
"consumer.pull.thread.pool.size";
+    public static final int DEFAULT_CONSUMER_PULL_POOL_SIZE = 20;
+
+    public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
+    public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;
+
+    public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 
"consumer.delay.when.message.not.found";
+    public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10;
+
+    /**
+     * Build Producer Configs.
+     * @param props Properties
+     * @param producer DefaultMQProducer
+     */
+    public static void buildProducerConfigs(Properties props, 
DefaultMQProducer producer) {
+        buildCommonConfigs(props, producer);
+
+        String group = props.getProperty(PRODUCER_GROUP);
+        if (StringUtils.isEmpty(group)) {
+            group = UUID.randomUUID().toString();
+        }
+        producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));
+
+        producer.setRetryTimesWhenSendFailed(getInteger(props,
+            PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+        producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
+            PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
+        producer.setSendMsgTimeout(getInteger(props,
+            PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
+    }
+
+    /**
+     * Build Consumer Configs.
+     * @param props Properties
+     * @param consumer DefaultMQPushConsumer
+     */
+    public static void buildConsumerConfigs(Properties props, 
DefaultMQPullConsumer consumer) {
+        buildCommonConfigs(props, consumer);
+
+        consumer.setMessageModel(MessageModel.CLUSTERING);
+
+        consumer.setPersistConsumerOffsetInterval(getInteger(props,
+            CONSUMER_OFFSET_PERSIST_INTERVAL, 
DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
+    }
+
+    /**
+     * Build Common Configs.
+     * @param props Properties
+     * @param client ClientConfig
+     */
+    public static void buildCommonConfigs(Properties props, ClientConfig 
client) {
+        String nameServers = props.getProperty(NAME_SERVER_ADDR);
+        Validate.notEmpty(nameServers);
+        client.setNamesrvAddr(nameServers);
+
+        client.setPollNameServerInterval(getInteger(props,
+            NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
+        client.setHeartbeatBrokerInterval(getInteger(props,
+            BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
new file mode 100644
index 0000000..e79d1b4
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSink.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink;
+
+import java.nio.charset.StandardCharsets;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.Validate;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.flink.common.selector.TopicSelector;
+import 
org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The RocketMQSink provides at-least-once reliability guarantees when
+ * checkpoints are enabled and batchFlushOnCheckpoint(true) is set.
+ * Otherwise, the sink reliability guarantees depends on rocketmq producer's 
retry policy.
+ */
+public class RocketMQSink<IN> extends RichSinkFunction<IN> implements 
CheckpointedFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQSink.class);
+
+    private transient DefaultMQProducer producer;
+    private boolean async; // false by default
+
+    private Properties props;
+    private TopicSelector<IN> topicSelector;
+    private KeyValueSerializationSchema<IN> serializationSchema;
+
+    private boolean batchFlushOnCheckpoint; // false by default
+    private List<Message> batchList;
+
+    public RocketMQSink(KeyValueSerializationSchema<IN> schema, 
TopicSelector<IN> topicSelector, Properties props) {
+        this.serializationSchema = schema;
+        this.topicSelector = topicSelector;
+        this.props = props;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        Validate.notEmpty(props, "Producer properties can not be empty");
+        Validate.notNull(topicSelector, "TopicSelector can not be null");
+        Validate.notNull(serializationSchema, "KeyValueSerializationSchema can 
not be null");
+
+        producer = new DefaultMQProducer();
+        
producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
+        RocketMQConfig.buildProducerConfigs(props, producer);
+
+        batchList = new LinkedList<>();
+
+        if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) 
getRuntimeContext()).isCheckpointingEnabled()) {
+            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is 
not enabled. Disabling flushing.");
+            batchFlushOnCheckpoint = false;
+        }
+
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void invoke(IN input, Context context) throws Exception {
+        Message msg = prepareMessage(input);
+
+        if (batchFlushOnCheckpoint) {
+            batchList.add(msg);
+            return;
+        }
+
+        if (async) {
+            // async sending
+            try {
+                producer.send(msg, new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        LOG.debug("Async send message success! result: {}", 
sendResult);
+                    }
+
+                    @Override
+                    public void onException(Throwable throwable) {
+                        if (throwable != null) {
+                            LOG.error("Async send message failure!", 
throwable);
+                        }
+                    }
+                });
+            } catch (Exception e) {
+                LOG.error("Async send message failure!", e);
+            }
+        } else {
+            // sync sending, will return a SendResult
+            try {
+                SendResult result = producer.send(msg);
+                LOG.debug("Sync send message result: {}", result);
+            } catch (Exception e) {
+                LOG.error("Sync send message failure!", e);
+            }
+        }
+    }
+
+    // Mapping: from storm tuple -> rocketmq Message
+    private Message prepareMessage(IN input) {
+        String topic = topicSelector.getTopic(input);
+        String tag = topicSelector.getTag(input) != null ? 
topicSelector.getTag(input) : "";
+
+        byte[] k = serializationSchema.serializeKey(input);
+        String key = k != null ? new String(k, StandardCharsets.UTF_8) : "";
+        byte[] value = serializationSchema.serializeValue(input);
+
+        Validate.notNull(topic, "the message topic is null");
+        Validate.notNull(value, "the message body is null");
+
+        Message msg = new Message(topic, tag, key, value);
+        return msg;
+    }
+
+    public RocketMQSink<IN> withAsync(boolean async) {
+        this.async = async;
+        return this;
+    }
+
+    public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean 
batchFlushOnCheckpoint) {
+        this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;
+        return this;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (producer != null) {
+            flushSync();
+            producer.shutdown();
+        }
+    }
+
+    private void flushSync() throws Exception {
+        if (batchFlushOnCheckpoint) {
+            synchronized (batchList) {
+                if (batchList.size() > 0) {
+                    producer.send(batchList);
+                    batchList.clear();
+                }
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        flushSync();
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        // nothing to do
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
new file mode 100644
index 0000000..2dc8fd5
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQSource.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.Validate;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullTaskCallback;
+import org.apache.rocketmq.client.consumer.PullTaskContext;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import 
org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_EARLIEST;
+import static org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_LATEST;
+import static 
org.apache.rocketmq.flink.RocketMQConfig.CONSUMER_OFFSET_TIMESTAMP;
+import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
+import static org.apache.rocketmq.flink.RocketMQUtils.getLong;
+
+/**
+ * The RocketMQSource is based on RocketMQ pull consumer mode,
+ * and provides exactly once reliability guarantees when checkpoints are 
enabled.
+ * Otherwise, the source doesn't provide any reliability guarantees.
+ */
+public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
+    implements CheckpointedFunction, ResultTypeQueryable<OUT> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RocketMQSource.class);
+
+    private transient MQPullConsumerScheduleService 
pullConsumerScheduleService;
+    private DefaultMQPullConsumer consumer;
+
+    private KeyValueDeserializationSchema<OUT> schema;
+
+    private RunningChecker runningChecker;
+
+    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
+    private Map<MessageQueue, Long> offsetTable;
+    private Map<MessageQueue, Long> restoredOffsets;
+
+    private Properties props;
+    private String topic;
+    private String group;
+
+    private static final String OFFSETS_STATE_NAME = 
"topic-partition-offset-states";
+
+    private transient volatile boolean restored;
+
+    public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, 
Properties props) {
+        this.schema = schema;
+        this.props = props;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        LOG.debug("source open....");
+        Validate.notEmpty(props, "Consumer properties can not be empty");
+        Validate.notNull(schema, "KeyValueDeserializationSchema can not be 
null");
+
+        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
+        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
+
+        Validate.notEmpty(topic, "Consumer topic can not be empty");
+        Validate.notEmpty(group, "Consumer group can not be empty");
+
+        if (offsetTable == null) {
+            offsetTable = new ConcurrentHashMap<>();
+        }
+        if (restoredOffsets == null) {
+            restoredOffsets = new ConcurrentHashMap<>();
+        }
+
+        runningChecker = new RunningChecker();
+
+        pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
+        consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
+
+        
consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
+        RocketMQConfig.buildConsumerConfigs(props, consumer);
+    }
+
+    @Override
+    public void run(SourceContext context) throws Exception {
+        LOG.debug("source run....");
+        // The lock that guarantees that record emission and state updates are 
atomic,
+        // from the view of taking a checkpoint.
+        final Object lock = context.getCheckpointLock();
+
+        int delayWhenMessageNotFound = getInteger(props, 
RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
+            RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
+
+        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, 
RocketMQConfig.DEFAULT_CONSUMER_TAG);
+
+        int pullPoolSize = getInteger(props, 
RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
+            RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
+
+        int pullBatchSize = getInteger(props, 
RocketMQConfig.CONSUMER_BATCH_SIZE,
+            RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
+
+        pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
+        pullConsumerScheduleService.registerPullTaskCallback(topic, new 
PullTaskCallback() {
+
+            @Override
+            public void doPullTask(MessageQueue mq, PullTaskContext 
pullTaskContext) {
+                try {
+                    long offset = getMessageQueueOffset(mq);
+                    if (offset < 0) {
+                        return;
+                    }
+
+                    PullResult pullResult = consumer.pull(mq, tag, offset, 
pullBatchSize);
+                    boolean found = false;
+                    switch (pullResult.getPullStatus()) {
+                        case FOUND:
+                            List<MessageExt> messages = 
pullResult.getMsgFoundList();
+                            for (MessageExt msg : messages) {
+                                byte[] key = msg.getKeys() != null ? 
msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
+                                byte[] value = msg.getBody();
+                                OUT data = schema.deserializeKeyAndValue(key, 
value);
+
+                                // output and state update are atomic
+                                synchronized (lock) {
+                                    context.collectWithTimestamp(data, 
msg.getBornTimestamp());
+                                }
+                            }
+                            found = true;
+                            break;
+                        case NO_MATCHED_MSG:
+                            LOG.debug("No matched message after offset {} for 
queue {}", offset, mq);
+                            break;
+                        case NO_NEW_MSG:
+                            break;
+                        case OFFSET_ILLEGAL:
+                            LOG.warn("Offset {} is illegal for queue {}", 
offset, mq);
+                            break;
+                        default:
+                            break;
+                    }
+
+                    synchronized (lock) {
+                        putMessageQueueOffset(mq, 
pullResult.getNextBeginOffset());
+                    }
+
+                    if (found) {
+                        pullTaskContext.setPullNextDelayTimeMillis(0); // no 
delay when messages were found
+                    } else {
+                        
pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+
+        try {
+            pullConsumerScheduleService.start();
+        } catch (MQClientException e) {
+            throw new RuntimeException(e);
+        }
+
+        runningChecker.setRunning(true);
+
+        awaitTermination();
+
+    }
+
+    private void awaitTermination() throws InterruptedException {
+        while (runningChecker.isRunning()) {
+            Thread.sleep(50);
+        }
+    }
+
+    private long getMessageQueueOffset(MessageQueue mq) throws 
MQClientException {
+        Long offset = offsetTable.get(mq);
+        // restoredOffsets(unionOffsetStates) is the restored global union 
state;
+        // should only snapshot mqs that actually belong to us
+        if (restored && offset == null) {
+            offset = restoredOffsets.get(mq);
+        }
+        if (offset == null) {
+            offset = consumer.fetchConsumeOffset(mq, false);
+            if (offset < 0) {
+                String initialOffset = 
props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, 
CONSUMER_OFFSET_LATEST);
+                switch (initialOffset) {
+                    case CONSUMER_OFFSET_EARLIEST:
+                        offset = consumer.minOffset(mq);
+                        break;
+                    case CONSUMER_OFFSET_LATEST:
+                        offset = consumer.maxOffset(mq);
+                        break;
+                    case CONSUMER_OFFSET_TIMESTAMP:
+                        offset = consumer.searchOffset(mq, getLong(props,
+                            RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, 
System.currentTimeMillis()));
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unknown value for 
CONSUMER_OFFSET_RESET_TO.");
+                }
+            }
+            offsetTable.put(mq, offset);
+        }
+
+        return offsetTable.get(mq);
+    }
+
+    private void putMessageQueueOffset(MessageQueue mq, long offset) throws 
MQClientException {
+        offsetTable.put(mq, offset);
+        consumer.updateConsumeOffset(mq, offset);
+    }
+
+    @Override
+    public void cancel() {
+        LOG.debug("cancel ...");
+        runningChecker.setRunning(false);
+
+        if (pullConsumerScheduleService != null) {
+            pullConsumerScheduleService.shutdown();
+        }
+
+        offsetTable.clear();
+        restoredOffsets.clear();
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.debug("close ...");
+        // pretty much the same logic as cancelling
+        try {
+            cancel();
+        } finally {
+            super.close();
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+        // called when a snapshot for a checkpoint is requested
+
+        if (!runningChecker.isRunning()) {
+            LOG.debug("snapshotState() called on closed source; returning 
null.");
+            return;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
+        }
+
+        unionOffsetStates.clear();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Snapshotted state, last processed offsets: {}, 
checkpoint id: {}, timestamp: {}",
+                offsetTable, context.getCheckpointId(), 
context.getCheckpointTimestamp());
+        }
+
+        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
+            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        // called every time the user-defined function is initialized,
+        // be that when the function is first initialized or be that
+        // when the function is actually recovering from an earlier checkpoint.
+        // Given this, initializeState() is not only the place where different 
types of state are initialized,
+        // but also where state recovery logic is included.
+        LOG.debug("initialize State ...");
+
+        this.unionOffsetStates = 
context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
+            OFFSETS_STATE_NAME, TypeInformation.of(new 
TypeHint<Tuple2<MessageQueue, Long>>() { })));
+
+        this.restored = context.isRestored();
+
+        if (restored) {
+            if (restoredOffsets == null) {
+                restoredOffsets = new ConcurrentHashMap<>();
+            }
+            for (Tuple2<MessageQueue, Long> mqOffsets : 
unionOffsetStates.get()) {
+                // unionOffsetStates is the restored global union state;
+                // should only snapshot mqs that actually belong to us
+                restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
+            }
+            LOG.info("Setting restore state in the consumer. Using the 
following offsets: {}", restoredOffsets);
+        } else {
+            LOG.info("No restore state for the consumer.");
+        }
+    }
+
+    @Override
+    public TypeInformation<OUT> getProducedType() {
+        return schema.getProducedType();
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java 
b/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java
new file mode 100644
index 0000000..9ca1de2
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/RocketMQUtils.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink;
+
+import java.util.Properties;
+
+public final class RocketMQUtils {
+
+    public static int getInteger(Properties props, String key, int 
defaultValue) {
+        return Integer.parseInt(props.getProperty(key, 
String.valueOf(defaultValue)));
+    }
+
+    public static long getLong(Properties props, String key, long 
defaultValue) {
+        return Long.parseLong(props.getProperty(key, 
String.valueOf(defaultValue)));
+    }
+
+    public static boolean getBoolean(Properties props, String key, boolean 
defaultValue) {
+        return Boolean.parseBoolean(props.getProperty(key, 
String.valueOf(defaultValue)));
+    }
+}
diff --git a/src/main/java/org/apache/rocketmq/flink/RunningChecker.java 
b/src/main/java/org/apache/rocketmq/flink/RunningChecker.java
new file mode 100644
index 0000000..b7bc2b9
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/RunningChecker.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink;
+
+import java.io.Serializable;
+
+public class RunningChecker implements Serializable {
+    private volatile boolean isRunning = false;
+
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    public void setRunning(boolean running) {
+        isRunning = running;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java
 
b/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..264d211
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelector.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.selector;
+
+public class DefaultTopicSelector<T> implements TopicSelector<T> {
+    private final String topicName;
+    private final String tagName;
+
+    public DefaultTopicSelector(final String topicName, final String tagName) {
+        this.topicName = topicName;
+        this.tagName = tagName;
+    }
+
+    public DefaultTopicSelector(final String topicName) {
+        this(topicName, "");
+    }
+
+    @Override
+    public String getTopic(T tuple) {
+        return topicName;
+    }
+
+    @Override
+    public String getTag(T tuple) {
+        return tagName;
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java
 
b/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java
new file mode 100644
index 0000000..3ad8a03
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelector.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.selector;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic and tag name from tuple.
+ */
+public class SimpleTopicSelector implements TopicSelector<Map> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleTopicSelector.class);
+
+    private final String topicFieldName;
+    private final String defaultTopicName;
+
+    private final String tagFieldName;
+    private final String defaultTagName;
+
+    /**
+     * SimpleTopicSelector Constructor.
+     * @param topicFieldName field name used for selecting topic
+     * @param defaultTopicName default field name used for selecting topic
+     * @param tagFieldName field name used for selecting tag
+     * @param defaultTagName default field name used for selecting tag
+     */
+    public SimpleTopicSelector(String topicFieldName, String defaultTopicName, 
String tagFieldName, String defaultTagName) {
+        this.topicFieldName = topicFieldName;
+        this.defaultTopicName = defaultTopicName;
+        this.tagFieldName = tagFieldName;
+        this.defaultTagName = defaultTagName;
+    }
+
+    @Override
+    public String getTopic(Map tuple) {
+        if (tuple.containsKey(topicFieldName)) {
+            Object topic =  tuple.get(topicFieldName);
+            return topic != null ? topic.toString() : defaultTopicName;
+        } else {
+            LOG.warn("Field {} Not Found. Returning default topic {}", 
topicFieldName, defaultTopicName);
+            return defaultTopicName;
+        }
+    }
+
+    @Override
+    public String getTag(Map tuple) {
+        if (tuple.containsKey(tagFieldName)) {
+            Object tag = tuple.get(tagFieldName);
+            return tag != null ? tag.toString() : defaultTagName;
+        } else {
+            LOG.warn("Field {} Not Found. Returning default tag {}", 
tagFieldName, defaultTagName);
+            return defaultTagName;
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java 
b/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java
new file mode 100644
index 0000000..2a347db
--- /dev/null
+++ b/src/main/java/org/apache/rocketmq/flink/common/selector/TopicSelector.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.selector;
+
+import java.io.Serializable;
+
+public interface TopicSelector<T> extends Serializable {
+
+    String getTopic(T tuple);
+
+    String getTag(T tuple);
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java
 
b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java
new file mode 100644
index 0000000..d8759f9
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueDeserializationSchema.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.serialization;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+public interface KeyValueDeserializationSchema<T> extends 
ResultTypeQueryable<T>, Serializable {
+    T deserializeKeyAndValue(byte[] key, byte[] value);
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java
 
b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java
new file mode 100644
index 0000000..d847e8a
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/common/serialization/KeyValueSerializationSchema.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.serialization;
+
+import java.io.Serializable;
+
+public interface KeyValueSerializationSchema<T> extends Serializable {
+
+    byte[] serializeKey(T tuple);
+
+    byte[] serializeValue(T tuple);
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java
 
b/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java
new file mode 100644
index 0000000..df6390b
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.serialization;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public class SimpleKeyValueDeserializationSchema implements 
KeyValueDeserializationSchema<Map> {
+    public static final String DEFAULT_KEY_FIELD = "key";
+    public static final String DEFAULT_VALUE_FIELD = "value";
+
+    public String keyField;
+    public String valueField;
+
+    public SimpleKeyValueDeserializationSchema() {
+        this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);
+    }
+
+    /**
+     * SimpleKeyValueDeserializationSchema Constructor.
+     * @param keyField tuple field for selecting the key
+     * @param valueField  tuple field for selecting the value
+     */
+    public SimpleKeyValueDeserializationSchema(String keyField, String 
valueField) {
+        this.keyField = keyField;
+        this.valueField = valueField;
+    }
+
+    @Override
+    public Map deserializeKeyAndValue(byte[] key, byte[] value) {
+        HashMap map = new HashMap(2);
+        if (keyField != null) {
+            String k = key != null ? new String(key, StandardCharsets.UTF_8) : 
null;
+            map.put(keyField, k);
+        }
+        if (valueField != null) {
+            String v = value != null ? new String(value, 
StandardCharsets.UTF_8) : null;
+            map.put(valueField, v);
+        }
+        return map;
+    }
+
+    @Override
+    public TypeInformation<Map> getProducedType() {
+        return TypeInformation.of(Map.class);
+    }
+}
diff --git 
a/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java
 
b/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java
new file mode 100644
index 0000000..bbd6da3
--- /dev/null
+++ 
b/src/main/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchema.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.serialization;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+public class SimpleKeyValueSerializationSchema implements 
KeyValueSerializationSchema<Map> {
+    public static final String DEFAULT_KEY_FIELD = "key";
+    public static final String DEFAULT_VALUE_FIELD = "value";
+
+    public String keyField;
+    public String valueField;
+
+    public SimpleKeyValueSerializationSchema() {
+        this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);
+    }
+
+    /**
+     * SimpleKeyValueSerializationSchema Constructor.
+     * @param keyField tuple field for selecting the key
+     * @param valueField  tuple field for selecting the value
+     */
+    public SimpleKeyValueSerializationSchema(String keyField, String 
valueField) {
+        this.keyField = keyField;
+        this.valueField = valueField;
+    }
+
+    @Override
+    public byte[] serializeKey(Map tuple) {
+        if (tuple == null || keyField == null) {
+            return null;
+        }
+        Object key = tuple.get(keyField);
+        return key != null ? key.toString().getBytes(StandardCharsets.UTF_8) : 
null;
+    }
+
+    @Override
+    public byte[] serializeValue(Map tuple) {
+        if (tuple == null || valueField == null) {
+            return null;
+        }
+        Object value = tuple.get(valueField);
+        return value != null ? 
value.toString().getBytes(StandardCharsets.UTF_8) : null;
+    }
+
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java 
b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
new file mode 100644
index 0000000..ec844f2
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/RocketMQSinkTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector;
+import org.apache.rocketmq.flink.common.selector.TopicSelector;
+import 
org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
+import 
org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.rocketmq.flink.TestUtils.setFieldValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class RocketMQSinkTest {
+
+    private RocketMQSink rocketMQSink;
+    private DefaultMQProducer producer;
+
+    @Before
+    public void setUp() throws Exception {
+        KeyValueSerializationSchema serializationSchema = new 
SimpleKeyValueSerializationSchema("id", "name");
+        TopicSelector topicSelector = new DefaultTopicSelector("tpc");
+        Properties props = new Properties();
+        rocketMQSink = new RocketMQSink(serializationSchema, topicSelector, 
props);
+
+        producer = mock(DefaultMQProducer.class);
+        setFieldValue(rocketMQSink, "producer", producer);
+    }
+
+    @Test
+    public void testSink() throws Exception {
+        Map tuple = new HashMap();
+        tuple.put("id", "x001");
+        tuple.put("name", "vesense");
+        tuple.put("tpc", "tpc1");
+
+        rocketMQSink.invoke(tuple, null);
+
+        verify(producer).send(any(Message.class));
+
+    }
+
+    @Test
+    public void close() throws Exception {
+        rocketMQSink.close();
+
+        verify(producer).shutdown();
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java 
b/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java
new file mode 100644
index 0000000..b7aaee0
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/RocketMQSourceTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import 
org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
+import 
org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.rocketmq.flink.TestUtils.setFieldValue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RocketMQSourceTest {
+
+    private RocketMQSource rocketMQSource;
+    private MQPullConsumerScheduleService pullConsumerScheduleService;
+    private DefaultMQPullConsumer consumer;
+    private KeyValueDeserializationSchema deserializationSchema;
+    private String topic = "tpc";
+
+    @Before
+    public void setUp() throws Exception {
+        deserializationSchema = new SimpleKeyValueDeserializationSchema();
+        Properties props = new Properties();
+        rocketMQSource = new RocketMQSource(deserializationSchema, props);
+
+        setFieldValue(rocketMQSource, "topic", topic);
+        setFieldValue(rocketMQSource, "runningChecker", new 
SingleRunningCheck());
+        setFieldValue(rocketMQSource, "offsetTable", new 
ConcurrentHashMap<>());
+        setFieldValue(rocketMQSource, "restoredOffsets", new 
ConcurrentHashMap<>());
+
+        pullConsumerScheduleService = new MQPullConsumerScheduleService("g");
+
+        consumer = mock(DefaultMQPullConsumer.class);
+        pullConsumerScheduleService.setDefaultMQPullConsumer(consumer);
+        setFieldValue(rocketMQSource, "consumer", consumer);
+        setFieldValue(rocketMQSource, "pullConsumerScheduleService", 
pullConsumerScheduleService);
+    }
+
+    @Test
+    public void testSource() throws Exception {
+        List<MessageExt> msgFoundList = new ArrayList<>();
+        MessageExt messageExt = new MessageExt();
+        messageExt.setKeys("keys");
+        messageExt.setBody("body data".getBytes());
+        messageExt.setBornTimestamp(System.currentTimeMillis());
+        msgFoundList.add(messageExt);
+        PullResult pullResult = new PullResult(PullStatus.FOUND, 3, 1, 5, 
msgFoundList);
+
+        when(consumer.fetchConsumeOffset(any(MessageQueue.class), 
anyBoolean())).thenReturn(2L);
+        when(consumer.pull(any(MessageQueue.class), anyString(), anyLong(), 
anyInt())).thenReturn(pullResult);
+
+        SourceContext context = mock(SourceContext.class);
+        when(context.getCheckpointLock()).thenReturn(new Object());
+
+        rocketMQSource.run(context);
+
+        // schedule the pull task
+        Set<MessageQueue> set = new HashSet();
+        set.add(new MessageQueue(topic, "brk", 1));
+        pullConsumerScheduleService.putTask(topic, set);
+
+        MessageExt msg = pullResult.getMsgFoundList().get(0);
+
+        // atLeastOnce: re-pulling immediately when messages found before
+        verify(context, 
atLeastOnce()).collectWithTimestamp(deserializationSchema.deserializeKeyAndValue(msg.getKeys().getBytes(),
+            msg.getBody()), msg.getBornTimestamp());
+    }
+
+    @Test
+    public void close() throws Exception {
+        rocketMQSource.close();
+
+        verify(consumer).shutdown();
+    }
+
+    class SingleRunningCheck extends RunningChecker {
+        @Override
+        public boolean isRunning() {
+            return false;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/TestUtils.java 
b/src/test/java/org/apache/rocketmq/flink/TestUtils.java
new file mode 100644
index 0000000..d0a9450
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/TestUtils.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink;
+
+import java.lang.reflect.Field;
+
+public class TestUtils {
+    public static void setFieldValue(Object obj, String fieldName, Object 
value) {
+        try {
+            Field field = obj.getClass().getDeclaredField(fieldName);
+            field.setAccessible(true);
+            field.set(obj, value);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git 
a/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java
 
b/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java
new file mode 100644
index 0000000..2f4685c
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/flink/common/selector/DefaultTopicSelectorTest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.selector;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class DefaultTopicSelectorTest {
+    @Test
+    public void getTopic() throws Exception {
+        DefaultTopicSelector selector = new DefaultTopicSelector("rocket");
+        assertEquals("rocket", selector.getTopic(null));
+        assertEquals("", selector.getTag(null));
+
+        selector = new DefaultTopicSelector("rocket", "tg");
+        assertEquals("rocket", selector.getTopic(null));
+        assertEquals("tg", selector.getTag(null));
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java
 
b/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java
new file mode 100644
index 0000000..6ac1a57
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/flink/common/selector/SimpleTopicSelectorTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.selector;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class SimpleTopicSelectorTest {
+    @Test
+    public void getTopic() throws Exception {
+        SimpleTopicSelector selector = new SimpleTopicSelector("tpc", "dtpc", 
"tg", "dtg");
+        Map tuple = new HashMap();
+        tuple.put("id", "x001");
+        tuple.put("name", "vesense");
+        tuple.put("tpc", "tpc1");
+        tuple.put("tg", "tg1");
+
+        assertEquals("tpc1", selector.getTopic(tuple));
+        assertEquals("tg1", selector.getTag(tuple));
+
+        tuple = new HashMap();
+        tuple.put("id", "x001");
+        tuple.put("name", "vesense");
+
+        assertEquals("dtpc", selector.getTopic(tuple));
+        assertEquals("dtg", selector.getTag(tuple));
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java
 
b/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java
new file mode 100644
index 0000000..98aa793
--- /dev/null
+++ 
b/src/test/java/org/apache/rocketmq/flink/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.common.serialization;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class SimpleKeyValueSerializationSchemaTest {
+    @Test
+    public void serializeKeyAndValue() throws Exception {
+        SimpleKeyValueSerializationSchema serializationSchema = new 
SimpleKeyValueSerializationSchema("id", "name");
+        SimpleKeyValueDeserializationSchema deserializationSchema = new 
SimpleKeyValueDeserializationSchema("id", "name");
+
+        Map tuple = new HashMap();
+        tuple.put("id", "x001");
+        tuple.put("name", "vesense");
+
+        assertEquals(tuple, 
deserializationSchema.deserializeKeyAndValue(serializationSchema.serializeKey(tuple),
+            serializationSchema.serializeValue(tuple)));
+    }
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java 
b/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java
new file mode 100644
index 0000000..1b07b8d
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/example/ConsumerTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.example;
+
+import java.util.List;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+
+public class ConsumerTest {
+    public static void main(String[] args) {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g00003");
+        consumer.setNamesrvAddr("localhost:9876");
+        try {
+            consumer.subscribe("flink-sink2", "*");
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs, ConsumeConcurrentlyContext context) {
+                for (MessageExt msg : msgs) {
+                    System.out.println(msg.getKeys() + ":" + new 
String(msg.getBody()));
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        try {
+            consumer.start();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java 
b/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java
new file mode 100644
index 0000000..c04ca74
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/example/ProducerTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.example;
+
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ProducerTest {
+    public static void main(String[] args) {
+        DefaultMQProducer producer = new DefaultMQProducer("p001");
+        producer.setNamesrvAddr("localhost:9876");
+        try {
+            producer.start();
+        } catch (MQClientException e) {
+            e.printStackTrace();
+        }
+        for (int i = 0; i < 10000; i++) {
+            Message msg = new Message("flink-source2" , "", "id_"+i, 
("country_X province_" + i).getBytes());
+            try {
+                producer.send(msg);
+            } catch (MQClientException e) {
+                e.printStackTrace();
+            } catch (RemotingException e) {
+                e.printStackTrace();
+            } catch (MQBrokerException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            System.out.println("send " + i);
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
diff --git 
a/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java 
b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
new file mode 100644
index 0000000..b2a4034
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/flink/example/RocketMQFlinkExample.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.flink.example;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.rocketmq.flink.RocketMQConfig;
+import org.apache.rocketmq.flink.RocketMQSink;
+import org.apache.rocketmq.flink.RocketMQSource;
+import org.apache.rocketmq.flink.common.selector.DefaultTopicSelector;
+import 
org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
+import 
org.apache.rocketmq.flink.common.serialization.SimpleKeyValueSerializationSchema;
+
+public class RocketMQFlinkExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // enable checkpoint
+        env.enableCheckpointing(3000);
+
+        Properties consumerProps = new Properties();
+        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
"localhost:9876");
+        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "c002");
+        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, 
"flink-source2");
+
+        Properties producerProps = new Properties();
+        producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, 
"localhost:9876");
+
+        env.addSource(new RocketMQSource(new 
SimpleKeyValueDeserializationSchema("id", "address"), consumerProps))
+            .name("rocketmq-source")
+            .setParallelism(2)
+            .process(new ProcessFunction<Map, Map>() {
+                @Override
+                public void processElement(Map in, Context ctx, Collector<Map> 
out) throws Exception {
+                    HashMap result = new HashMap();
+                    result.put("id", in.get("id"));
+                    String[] arr = in.get("address").toString().split("\\s+");
+                    result.put("province", arr[arr.length-1]);
+                    out.collect(result);
+                }
+            })
+            .name("upper-processor")
+            .setParallelism(2)
+            .addSink(new RocketMQSink(new 
SimpleKeyValueSerializationSchema("id", "province"),
+                new DefaultTopicSelector("flink-sink2"), 
producerProps).withBatchFlushOnCheckpoint(true))
+            .name("rocketmq-sink")
+            .setParallelism(2);
+
+        try {
+            env.execute("rocketmq-flink-example");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/style/copyright/Apache.xml b/style/copyright/Apache.xml
new file mode 100644
index 0000000..2db86d0
--- /dev/null
+++ b/style/copyright/Apache.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <copyright>
+        <option name="myName" value="Apache"/>
+        <option name="notice"
+                value="Licensed to the Apache Software Foundation (ASF) under 
one or more&#10;contributor license agreements.  See the NOTICE file 
distributed with&#10;this work for additional information regarding copyright 
ownership.&#10;The ASF licenses this file to You under the Apache License, 
Version 2.0&#10;(the &quot;License&quot;); you may not use this file except in 
compliance with&#10;the License.  You may obtain a copy of the License 
at&#10;&#10;    http://www.apache.org/lice [...]
+    </copyright>
+</component>
\ No newline at end of file
diff --git a/style/copyright/profiles_settings.xml 
b/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..4c0e521
--- /dev/null
+++ b/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <settings default="Apache">
+        <module2copyright>
+            <element module="All" copyright="Apache"/>
+        </module2copyright>
+        <LanguageOptions name="GSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="HTML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JAVA">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="addBlankAfter" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSPX">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="MXML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="Properties">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="SPI">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="XML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="__TEMPLATE__">
+            <option name="separateBefore" value="true"/>
+            <option name="lenBefore" value="1"/>
+        </LanguageOptions>
+    </settings>
+</component>
\ No newline at end of file
diff --git a/style/rmq_checkstyle.xml b/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..e3155cc
--- /dev/null
+++ b/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd";>
+<!--Refer 
http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding
 -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software 
Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println 
in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in 
javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  
value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override 
hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds 
code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example 
the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup 
switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" 
value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to 
producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch 
parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * 
notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="WhitespaceAfter"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>
diff --git a/style/rmq_codeStyle.xml b/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..cd95ee6
--- /dev/null
+++ b/style/rmq_codeStyle.xml
@@ -0,0 +1,157 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<code_scheme name="rocketmq">
+    <option name="USE_SAME_INDENTS" value="true"/>
+    <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+    <option name="OTHER_INDENT_OPTIONS">
+        <value>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+            <option name="USE_TAB_CHARACTER" value="false"/>
+            <option name="SMART_TABS" value="false"/>
+            <option name="LABEL_INDENT_SIZE" value="0"/>
+            <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+            <option name="USE_RELATIVE_INDENTS" value="false"/>
+        </value>
+    </option>
+    <option name="PREFER_LONGER_NAMES" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="true"/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+    <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+    <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+    <option name="ELSE_ON_NEW_LINE" value="true"/>
+    <option name="WHILE_ON_NEW_LINE" value="true"/>
+    <option name="CATCH_ON_NEW_LINE" value="true"/>
+    <option name="FINALLY_ON_NEW_LINE" value="true"/>
+    <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+    <option name="ALIGN_MULTILINE_FOR" value="false"/>
+    <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+    <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+    <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+    <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+    <option name="LABELED_STATEMENT_WRAP" value="1"/>
+    <option name="WRAP_COMMENTS" value="true"/>
+    <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+    <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+    <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+    <JavaCodeStyleSettings>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+        <option name="INDENT_SIZE" value="2"/>
+    </ADDITIONAL_INDENT_OPTIONS>
+    <codeStyleSettings language="Groovy">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="HOCON">
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="JAVA">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+        <option name="LABELED_STATEMENT_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="JSON">
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="Scala">
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="XML">
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>
\ No newline at end of file

Reply via email to