This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 90b00be update README.md (#66)
90b00be is described below
commit 90b00be02dd019ffed48cbc35cd050b07c4a36ab
Author: 高思伟 <[email protected]>
AuthorDate: Tue Oct 18 15:01:46 2022 +0800
update README.md (#66)
Co-authored-by: 高思伟 <[email protected]>
---
README.md | 26 ++++++++++++++++++++------
1 file changed, 20 insertions(+), 6 deletions(-)
diff --git a/README.md b/README.md
index be889eb..67d04de 100644
--- a/README.md
+++ b/README.md
@@ -78,23 +78,37 @@ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironm
// use group offsets.
// If there is no committed offset,consumer would start from the
latest offset.
source.setStartFromGroupOffsets(OffsetResetStrategy.LATEST);
- env.addSource()
+ env.addSource(source)
.name("rocketmq-source")
.setParallelism(2)
- .process(new ProcessFunction<Map, Map>() {
+ .process(new ProcessFunction<Map<Object, Object>, Map<Object,
Object>>() {
@Override
- public void processElement(Map in, Context ctx, Collector<Map>
out) throws Exception {
+ public void processElement(
+ Map<Object, Object> in,
+ Context ctx,
+ Collector<Map<Object, Object>> out) {
HashMap result = new HashMap();
result.put("id", in.get("id"));
String[] arr = in.get("address").toString().split("\\s+");
- result.put("province", arr[arr.length-1]);
+ result.put("province", arr[arr.length - 1]);
out.collect(result);
}
})
.name("upper-processor")
.setParallelism(2)
- .addSink(new RocketMQSink(new
SimpleKeyValueSerializationSchema("id", "province"),
- new DefaultTopicSelector("flink-sink2"),
producerProps).withBatchFlushOnCheckpoint(true))
+ .process(new ProcessFunction<Map<Object, Object>, Message>() {
+ @Override
+ public void processElement(Map<Object, Object> value, Context
ctx, Collector<Message> out) {
+ String jsonString = JSONObject.toJSONString(value);
+ Message message =
+ new Message(
+ "flink-sink2",
+ "",
+
jsonString.getBytes(StandardCharsets.UTF_8));
+ out.collect(message);
+ }
+ })
+ .addSink(new
RocketMQSink(producerProps).withBatchFlushOnCheckpoint(true))
.name("rocketmq-sink")
.setParallelism(2);