This is an automated email from the ASF dual-hosted git repository.
karp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new cc9d48b polish(serial) add the generic type, Map<String, String>,
into SimpleKeyValueDeserializationSchema
new 45590d1 Merge pull request #86 from ni-ze/main
cc9d48b is described below
commit cc9d48b5beeb0c98a34cd45f81c3a74739573cf6
Author: 维章 <[email protected]>
AuthorDate: Thu Apr 20 09:55:17 2023 +0800
polish(serial) add the generic type, Map<String, String>, into
SimpleKeyValueDeserializationSchema
---
.../serialization/SimpleKeyValueDeserializationSchema.java | 11 ++++++-----
.../org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java | 9 +++++----
.../serialization/SimpleKeyValueSerializationSchemaTest.java | 2 +-
3 files changed, 12 insertions(+), 10 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
index 3c9ae95..1177f76 100644
---
a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
+++
b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -21,8 +21,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
-public class SimpleKeyValueDeserializationSchema implements
KeyValueDeserializationSchema<Map> {
+public class SimpleKeyValueDeserializationSchema implements
KeyValueDeserializationSchema<Map<String, String>> {
public static final String DEFAULT_KEY_FIELD = "key";
public static final String DEFAULT_VALUE_FIELD = "value";
@@ -45,8 +46,8 @@ public class SimpleKeyValueDeserializationSchema implements
KeyValueDeserializat
}
@Override
- public Map deserializeKeyAndValue(byte[] key, byte[] value) {
- HashMap map = new HashMap(2);
+ public Map<String, String> deserializeKeyAndValue(byte[] key, byte[]
value) {
+ HashMap<String, String> map = new HashMap<>(2);
if (keyField != null) {
String k = key != null ? new String(key, StandardCharsets.UTF_8) :
null;
map.put(keyField, k);
@@ -59,7 +60,7 @@ public class SimpleKeyValueDeserializationSchema implements
KeyValueDeserializat
}
@Override
- public TypeInformation<Map> getProducedType() {
- return TypeInformation.of(Map.class);
+ public TypeInformation<Map<String, String>> getProducedType() {
+ return new MapTypeInfo<>(String.class, String.class);
}
}
diff --git
a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
index 9c5042c..9e78190 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.flink.legacy;
+import java.util.Map;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -50,16 +51,16 @@ import static org.mockito.Mockito.when;
@Ignore
public class RocketMQSourceTest {
- private RocketMQSourceFunction rocketMQSource;
+ private RocketMQSourceFunction<Map<String, String>> rocketMQSource;
private DefaultLitePullConsumer consumer;
- private KeyValueDeserializationSchema deserializationSchema;
+ private KeyValueDeserializationSchema<Map<String, String>>
deserializationSchema;
private String topic = "tpc";
@Before
public void setUp() throws Exception {
deserializationSchema = new SimpleKeyValueDeserializationSchema();
Properties props = new Properties();
- rocketMQSource = new RocketMQSourceFunction(deserializationSchema,
props);
+ rocketMQSource = new RocketMQSourceFunction<>(deserializationSchema,
props);
setFieldValue(rocketMQSource, "topic", topic);
setFieldValue(rocketMQSource, "runningChecker", new
SingleRunningCheck());
@@ -89,7 +90,7 @@ public class RocketMQSourceTest {
rocketMQSource.run(context);
// schedule the pull task
- Set<MessageQueue> set = new HashSet();
+ Set<MessageQueue> set = new HashSet<>();
set.add(new MessageQueue(topic, "brk", 1));
MessageExt msg = pullResult.getMsgFoundList().get(0);
diff --git
a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
index 7e2e0d9..e27fdf5 100644
---
a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
+++
b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -33,7 +33,7 @@ public class SimpleKeyValueSerializationSchemaTest {
SimpleKeyValueDeserializationSchema deserializationSchema =
new SimpleKeyValueDeserializationSchema("id", "name");
- Map tuple = new HashMap();
+ Map<String, String> tuple = new HashMap<>();
tuple.put("id", "x001");
tuple.put("name", "vesense");