This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new af45610  [ISSUE #205]Rocketmq replicator running null pointer (#206)
af45610 is described below

commit af4561057e39be4580d90fe95235b485306c86d1
Author: xiaoyi <[email protected]>
AuthorDate: Thu Jul 21 14:00:51 2022 +0800

    [ISSUE #205]Rocketmq replicator running null pointer (#206)
    
    * Fix debezium demecial type conversion problem #190
    
    * Upgrade rocketmq-replicator API to v0.1.3 #189
    
    * Encountered change event for table databasename.tablename whose schema 
isn`t known to this connector #191
    
    * Debezium mysql source connector delete event causes null pointer #196
    
    * remove local config
    
    * Debezium mysql source connector delete event causes null pointer #196
    
    * Rocketmq replicator running null pointer #205
---
 .../java/org/apache/rocketmq/replicator/RmqSourceTask.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 2a5be6d..78d2457 100644
--- 
a/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ 
b/connectors/rocketmq-replicator/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -149,12 +149,14 @@ public class RmqSourceTask extends SourceTask {
                         case FOUND: {
                             this.mqOffsetMap.put(taskTopicConfig, 
pullResult.getNextBeginOffset());
                             List<MessageExt> msgs = 
pullResult.getMsgFoundList();
-                            List<Field> fields = new ArrayList<>();
-                            Schema schema = new 
Schema(SchemaEnum.MESSAGE.name(), FieldType.STRING, fields);
-                            schema.getFields().add(new Field(0, 
FieldName.COMMON_MESSAGE.getKey(), SchemaBuilder.string().build()));
                             for (MessageExt msg : msgs) {
-                                ConnectRecord connectRecord = new 
ConnectRecord(Utils.offsetKey(taskTopicConfig),
-                                    
Utils.offsetValue(pullResult.getNextBeginOffset()), System.currentTimeMillis(), 
schema, new String(msg.getBody(), StandardCharsets.UTF_8));
+                                ConnectRecord connectRecord = new 
ConnectRecord(
+                                        Utils.offsetKey(taskTopicConfig),
+                                        
Utils.offsetValue(pullResult.getNextBeginOffset()),
+                                        System.currentTimeMillis(),
+                                        SchemaBuilder.string().name( 
FieldName.COMMON_MESSAGE.getKey()).build(),
+                                        new String(msg.getBody(), 
StandardCharsets.UTF_8)
+                                );
                                 final Map<String, String> properties = 
msg.getProperties();
                                 final Set<String> keys = properties.keySet();
                                 keys.forEach(key -> 
connectRecord.addExtension(key, properties.get(key)));

Reply via email to