This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new af45610 [ISSUE #205]Rocketmq replicator running null pointer (#206)
af45610 is described below
commit af4561057e39be4580d90fe95235b485306c86d1
Author: xiaoyi <[email protected]>
AuthorDate: Thu Jul 21 14:00:51 2022 +0800
[ISSUE #205]Rocketmq replicator running null pointer (#206)
* Fix debezium demecial type conversion problem #190
* Upgrade rocketmq-replicator API to v0.1.3 #189
* Encountered change event for table databasename.tablename whose schema
isn`t known to this connector #191
* Debezium mysql source connector delete event causes null pointer #196
* remove local config
* Debezium mysql source connector delete event causes null pointer #196
* Rocketmq replicator running null pointer #205
---
.../java/org/apache/rocketmq/replicator/RmqSourceTask.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 2a5be6d..78d2457 100644
---
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -149,12 +149,14 @@ public class RmqSourceTask extends SourceTask {
case FOUND: {
this.mqOffsetMap.put(taskTopicConfig,
pullResult.getNextBeginOffset());
List<MessageExt> msgs =
pullResult.getMsgFoundList();
- List<Field> fields = new ArrayList<>();
- Schema schema = new
Schema(SchemaEnum.MESSAGE.name(), FieldType.STRING, fields);
- schema.getFields().add(new Field(0,
FieldName.COMMON_MESSAGE.getKey(), SchemaBuilder.string().build()));
for (MessageExt msg : msgs) {
- ConnectRecord connectRecord = new
ConnectRecord(Utils.offsetKey(taskTopicConfig),
-
Utils.offsetValue(pullResult.getNextBeginOffset()), System.currentTimeMillis(),
schema, new String(msg.getBody(), StandardCharsets.UTF_8));
+ ConnectRecord connectRecord = new
ConnectRecord(
+ Utils.offsetKey(taskTopicConfig),
+
Utils.offsetValue(pullResult.getNextBeginOffset()),
+ System.currentTimeMillis(),
+ SchemaBuilder.string().name(
FieldName.COMMON_MESSAGE.getKey()).build(),
+ new String(msg.getBody(),
StandardCharsets.UTF_8)
+ );
final Map<String, String> properties =
msg.getProperties();
final Set<String> keys = properties.keySet();
keys.forEach(key ->
connectRecord.addExtension(key, properties.get(key)));