This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new 90b00be  update README.md (#66)
90b00be is described below

commit 90b00be02dd019ffed48cbc35cd050b07c4a36ab
Author: 高思伟 <[email protected]>
AuthorDate: Tue Oct 18 15:01:46 2022 +0800

    update README.md (#66)
    
    Co-authored-by: 高思伟 <[email protected]>
---
 README.md | 26 ++++++++++++++++++++------
 1 file changed, 20 insertions(+), 6 deletions(-)

diff --git a/README.md b/README.md
index be889eb..67d04de 100644
--- a/README.md
+++ b/README.md
@@ -78,23 +78,37 @@ StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironm
         // use group offsets.
         // If there is no committed offset,consumer would start from the 
latest offset.
         source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST);
-        env.addSource()
+        env.addSource(source)
             .name("rocketmq-source")
             .setParallelism(2)
-            .process(new ProcessFunction<Map, Map>() {
+            .process(new ProcessFunction<Map<Object, Object>, Map<Object, 
Object>>() {
                 @Override
-                public void processElement(Map in, Context ctx, Collector<Map> 
out) throws Exception {
+                public void processElement(
+                        Map<Object, Object> in,
+                        Context ctx,
+                        Collector<Map<Object, Object>> out) {
                     HashMap result = new HashMap();
                     result.put("id", in.get("id"));
                     String[] arr = in.get("address").toString().split("\\s+");
-                    result.put("province", arr[arr.length-1]);
+                    result.put("province", arr[arr.length - 1]);
                     out.collect(result);
                 }
             })
             .name("upper-processor")
             .setParallelism(2)
-            .addSink(new RocketMQSink(new 
SimpleKeyValueSerializationSchema("id", "province"),
-                new DefaultTopicSelector("flink-sink2"), 
producerProps).withBatchFlushOnCheckpoint(true))
+            .process(new ProcessFunction<Map<Object, Object>, Message>() {
+                @Override
+                public void processElement(Map<Object, Object> value, Context 
ctx, Collector<Message> out) {
+                    String jsonString = JSONObject.toJSONString(value);
+                    Message message =
+                            new Message(
+                                    "flink-sink2",
+                                    "",
+                                    
jsonString.getBytes(StandardCharsets.UTF_8));
+                    out.collect(message);
+                }
+            })
+            .addSink(new 
RocketMQSink(producerProps).withBatchFlushOnCheckpoint(true))
             .name("rocketmq-sink")
             .setParallelism(2);
 

Reply via email to