This is an automated email from the ASF dual-hosted git repository.

karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new cc9d48b  polish(serial) add the generic type, Map<String, String>, 
into SimpleKeyValueDeserializationSchema
     new 45590d1  Merge pull request #86 from ni-ze/main
cc9d48b is described below

commit cc9d48b5beeb0c98a34cd45f81c3a74739573cf6
Author: 维章 <[email protected]>
AuthorDate: Thu Apr 20 09:55:17 2023 +0800

    polish(serial) add the generic type, Map<String, String>, into 
SimpleKeyValueDeserializationSchema
---
 .../serialization/SimpleKeyValueDeserializationSchema.java    | 11 ++++++-----
 .../org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java  |  9 +++++----
 .../serialization/SimpleKeyValueSerializationSchemaTest.java  |  2 +-
 3 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
index 3c9ae95..1177f76 100644
--- 
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
+++ 
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -21,8 +21,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
 
-public class SimpleKeyValueDeserializationSchema implements 
KeyValueDeserializationSchema<Map> {
+public class SimpleKeyValueDeserializationSchema implements 
KeyValueDeserializationSchema<Map<String, String>> {
     public static final String DEFAULT_KEY_FIELD = "key";
     public static final String DEFAULT_VALUE_FIELD = "value";
 
@@ -45,8 +46,8 @@ public class SimpleKeyValueDeserializationSchema implements 
KeyValueDeserializat
     }
 
     @Override
-    public Map deserializeKeyAndValue(byte[] key, byte[] value) {
-        HashMap map = new HashMap(2);
+    public Map<String, String> deserializeKeyAndValue(byte[] key, byte[] 
value) {
+        HashMap<String, String> map = new HashMap<>(2);
         if (keyField != null) {
             String k = key != null ? new String(key, StandardCharsets.UTF_8) : 
null;
             map.put(keyField, k);
@@ -59,7 +60,7 @@ public class SimpleKeyValueDeserializationSchema implements 
KeyValueDeserializat
     }
 
     @Override
-    public TypeInformation<Map> getProducedType() {
-        return TypeInformation.of(Map.class);
+    public TypeInformation<Map<String, String>> getProducedType() {
+        return new MapTypeInfo<>(String.class, String.class);
     }
 }
diff --git 
a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java 
b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
index 9c5042c..9e78190 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.rocketmq.flink.legacy;
 
+import java.util.Map;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -50,16 +51,16 @@ import static org.mockito.Mockito.when;
 @Ignore
 public class RocketMQSourceTest {
 
-    private RocketMQSourceFunction rocketMQSource;
+    private RocketMQSourceFunction<Map<String, String>> rocketMQSource;
     private DefaultLitePullConsumer consumer;
-    private KeyValueDeserializationSchema deserializationSchema;
+    private KeyValueDeserializationSchema<Map<String, String>> 
deserializationSchema;
     private String topic = "tpc";
 
     @Before
     public void setUp() throws Exception {
         deserializationSchema = new SimpleKeyValueDeserializationSchema();
         Properties props = new Properties();
-        rocketMQSource = new RocketMQSourceFunction(deserializationSchema, 
props);
+        rocketMQSource = new RocketMQSourceFunction<>(deserializationSchema, 
props);
 
         setFieldValue(rocketMQSource, "topic", topic);
         setFieldValue(rocketMQSource, "runningChecker", new 
SingleRunningCheck());
@@ -89,7 +90,7 @@ public class RocketMQSourceTest {
         rocketMQSource.run(context);
 
         // schedule the pull task
-        Set<MessageQueue> set = new HashSet();
+        Set<MessageQueue> set = new HashSet<>();
         set.add(new MessageQueue(topic, "brk", 1));
 
         MessageExt msg = pullResult.getMsgFoundList().get(0);
diff --git 
a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
 
b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
index 7e2e0d9..e27fdf5 100644
--- 
a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
+++ 
b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -33,7 +33,7 @@ public class SimpleKeyValueSerializationSchemaTest {
         SimpleKeyValueDeserializationSchema deserializationSchema =
                 new SimpleKeyValueDeserializationSchema("id", "name");
 
-        Map tuple = new HashMap();
+        Map<String, String> tuple = new HashMap<>();
         tuple.put("id", "x001");
         tuple.put("name", "vesense");
 

Reply via email to