[ 
https://issues.apache.org/jira/browse/ROCKETMQ-82?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406419#comment-16406419
 ] 

ASF GitHub Bot commented on ROCKETMQ-82:
----------------------------------------

vesense commented on a change in pull request #45: ROCKETMQ-82: RocketMQ-Flink 
Integration
URL: https://github.com/apache/rocketmq-externals/pull/45#discussion_r175789252
 
 

 ##########
 File path: rocketmq-flink/README.md
 ##########
 @@ -0,0 +1,147 @@
+# RocketMQ-Flink
+
+RocketMQ integration for [Apache Flink](https://flink.apache.org/). This 
module includes the RocketMQ source and sink that allows a flink job to either 
write messages into a topic or read from topics in a flink job.
+
+
+## RocketMQSource
+To use the `RocketMQSource`,  you construct an instance of it by specifying a 
KeyValueDeserializationSchema instance and a Properties instance which 
including rocketmq configs.
+`RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props)`
+The RocketMQSource is based on RocketMQ pull consumer mode, and provides 
exactly once reliability guarantees when checkpoints are enabled.
+Otherwise, the source doesn't provide any reliability guarantees.
+
+### KeyValueDeserializationSchema
+The main API for deserializing topic and tags is the 
`org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema` 
interface.
+`rocketmq-flink` includes general purpose `KeyValueDeserializationSchema` 
implementations called `SimpleKeyValueDeserializationSchema`.
+
+```java
+public interface KeyValueDeserializationSchema<T> extends 
ResultTypeQueryable<T>, Serializable {
+    T deserializeKeyAndValue(byte[] key, byte[] value);
+}
+```
+
+## RocketMQSink
+To use the `RocketMQSink`,  you construct an instance of it by specifying 
KeyValueSerializationSchema & TopicSelector instances and a Properties instance 
which including rocketmq configs.
+`RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> 
topicSelector, Properties props)`
+The RocketMQSink provides at-least-once reliability guarantees when 
checkpoints are enabled and `withBatchFlushOnCheckpoint(true)` is set.
+Otherwise, the sink reliability guarantees depends on rocketmq producer's 
retry policy, for this case, the messages sending way is sync by default,
+but you can change it by invoking `withAsync(true)`. 
+
+### KeyValueSerializationSchema
+The main API for serializing topic and tags is the 
`org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema` 
interface.
+`rocketmq-flink` includes general purpose `KeyValueSerializationSchema` 
implementations called `SimpleKeyValueSerializationSchema`.
+
+```java
+public interface KeyValueSerializationSchema<T> extends Serializable {
+
+    byte[] serializeKey(T tuple);
+
+    byte[] serializeValue(T tuple);
+}
+```
+
+### TopicSelector
+The main API for selecting topic and tags is the 
`org.apache.rocketmq.flink.common.selector.TopicSelector` interface.
+`rocketmq-flink` includes general purpose `TopicSelector` implementations 
called `DefaultTopicSelector` and `SimpleTopicSelector`.
+
+```java
+public interface TopicSelector<T> extends Serializable {
+
+    String getTopic(T tuple);
+
+    String getTag(T tuple);
+}
+```
+
+## Examples
+The following is an example which receive messages from RocketMQ brokers and 
send messages to broker after processing.
+
+ ```java
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // enable checkpoint
+        env.enableCheckpointing(3000);
+
+        Properties consumerProps = new Properties();
+        consumerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, 
"localhost:9876");
+        consumerProps.setProperty(RocketMqConfig.CONSUMER_GROUP, "c002");
+        consumerProps.setProperty(RocketMqConfig.CONSUMER_TOPIC, 
"flink-source2");
+
+        Properties producerProps = new Properties();
+        producerProps.setProperty(RocketMqConfig.NAME_SERVER_ADDR, 
"localhost:9876");
+
+        env.addSource(new RocketMQSource(new 
SimpleKeyValueDeserializationSchema("id", "address"), consumerProps))
+            .name("rocketmq-source")
+            .setParallelism(2)
+            .process(new ProcessFunction<Map, Map>() {
+                @Override
+                public void processElement(Map in, Context ctx, Collector<Map> 
out) throws Exception {
+                    HashMap result = new HashMap();
+                    result.put("id", in.get("id"));
+                    String[] arr = in.get("address").toString().split("\\s+");
+                    result.put("province", arr[arr.length-1]);
+                    out.collect(result);
+                }
+            })
+            .name("upper-processor")
+            .setParallelism(2)
+            .addSink(new RocketMQSink(new 
SimpleKeyValueSerializationSchema("id", "province"),
+                new DefaultTopicSelector("flink-sink2"), 
producerProps).withBatchFlushOnCheckpoint(true))
+            .name("rocketmq-sink")
+            .setParallelism(2);
+
+        try {
+            env.execute("rocketmq-flink-example");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+ ```
+
+## Configurations
+The following configurations are all from the class 
`org.apache.rocketmq.flink.RocketMqConfig`.
+
+### Producer Configurations
+| NAME        | DESCRIPTION           | DEFAULT  |
+| ------------- |:-------------:|:------:|
+| nameserver.address      | name server address *Required* | null |
+| nameserver.poll.interval      | name server poll topic info interval     |   
30000 |
+| brokerserver.heartbeat.interval | broker server heartbeat interval      |    
30000 |
+| producer.group | producer group      |    $UUID |
+| producer.retry.times | producer send messages retry times      |    3 |
+| producer.timeout | producer send messages timeout      |    3000 |
+
+
+### Consumer Configurations
+| NAME        | DESCRIPTION           | DEFAULT  |
+| ------------- |:-------------:|:------:|
+| nameserver.address      | name server address *Required* | null |
+| nameserver.poll.interval      | name server poll topic info interval     |   
30000 |
+| brokerserver.heartbeat.interval | broker server heartbeat interval      |    
30000 |
+| consumer.group | consumer group *Required*     |    null |
+| consumer.topic | consumer topic *Required*       |    null |
+| consumer.tag | consumer topic tag      |    * |
+| consumer.offset.reset.to | what to do when there is no initial offset on the 
server      |   latest/earliest/timestamp |
+| consumer.offset.from.timestamp | the timestamp when 
`consumer.offset.reset.to=timestamp` was set   |   $TIMESTAMP |
 
 Review comment:
   Good catch. Will fix.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Add the RocketMQ plugin for the Apache Flink
> --------------------------------------------
>
>                 Key: ROCKETMQ-82
>                 URL: https://issues.apache.org/jira/browse/ROCKETMQ-82
>             Project: Apache RocketMQ
>          Issue Type: Wish
>          Components: rocketmq-externals
>            Reporter: vongosling
>            Assignee: Xin Wang
>            Priority: Minor
>
> A few basic data sources and sinks are built into Flink and are always 
> available. The [predefined data 
> sources|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#data-sources]
>  include reading from files, directories, and sockets, and ingesting data 
> from collections and iterators. The [predefined data 
> sinks|https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#data-sinks]
>  support writing to files, to stdout and stderr, and to sockets.
> The connector is the integration plugin for RocketMQ and Flink, some details 
> in here,
> 1. 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/connectors/index.html]
> 2. [https://github.com/apache/rocketmq-externals/tree/master/rocketmq-flink]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to