This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new c5d9788  Adapt hudi to openmessaging-connector 0.1.4 (#267)
c5d9788 is described below

commit c5d97886f73dc321c8cf98d9b14c7370a7815174
Author: lizhiboo <[email protected]>
AuthorDate: Wed Aug 24 17:27:49 2022 +0800

    Adapt hudi to openmessaging-connector 0.1.4 (#267)
    
    * adapt to new api
    
    * fulfill run hudi-connector locally demo doc
    
    * adapt to 0.1.4
    
    * reset connect-standalone.conf
    
    * add schema file
---
 .../connect/hudi/connector/HudiSinkTask.java       | 12 ++++++++++--
 .../apache/rocketmq/connect/hudi/sink/Updater.java | 22 ++++++++++++++++++++--
 .../src/test/java/Producer.java                    |  2 +-
 .../rocketmq-connect-hudi/src/test/java/user.avsc  |  9 +++++++++
 4 files changed, 40 insertions(+), 5 deletions(-)

diff --git 
a/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
 
b/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
index 915c95b..112f530 100644
--- 
a/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
+++ 
b/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.connect.hudi.connector;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.component.task.sink.SinkTask;
 import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.errors.ConnectException;
 import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
 import org.apache.rocketmq.connect.hudi.config.ConfigUtil;
@@ -28,6 +30,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+
 import java.util.List;
 import java.util.Map;
 
@@ -95,9 +98,14 @@ public class HudiSinkTask extends SinkTask {
         }
     }
 
-
     @Override
-    public void validate(KeyValue config) {
+    public void flush(Map<RecordPartition, RecordOffset> currentOffsets) 
throws ConnectException {
+    }
 
+    @Override
+    public Map<RecordPartition, RecordOffset> preCommit(Map<RecordPartition, 
RecordOffset> currentOffsets) {
+        this.flush(currentOffsets);
+        return currentOffsets;
     }
+
 }
diff --git 
a/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
 
b/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
index e0ea311..4258767 100644
--- 
a/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
+++ 
b/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
@@ -140,8 +140,12 @@ public class Updater {
         }
     }
 
-    private GenericRecord sinkDataEntry2GenericRecord(ConnectRecord record) {
-        byte[] recordBytes = new byte[0];
+
+    private GenericRecord sinkDataEntry2GenericRecord(ConnectRecord record) 
throws UnsupportedEncodingException {
+        byte[] recordBytes = ((String) record.getData()).getBytes("UTF8");
+        GenericRecord genericRecord = new 
GenericData.Record(this.hudiConnectConfig.schema);
+        DatumReader<GenericRecord> userDatumReader = new 
SpecificDatumReader<GenericRecord>(this.hudiConnectConfig.schema);
+        BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(recordBytes, null);
         try {
             recordBytes = ((String) record.getData()).getBytes("UTF8");
         } catch (UnsupportedEncodingException e) {
@@ -210,6 +214,20 @@ public class Updater {
             commitList = inflightList;
             inflightList = new ArrayList<>();
         }
+
+        List<HoodieRecord> hoodieRecordsList = new ArrayList<>();
+        for (ConnectRecord record : commitList) {
+            GenericRecord genericRecord = null;
+            try {
+                genericRecord = sinkDataEntry2GenericRecord(record);
+            } catch (UnsupportedEncodingException e) {
+                log.error("parse record error, ", e);
+                continue;
+            }
+            HoodieRecord<HoodieAvroPayload> hoodieRecord = new 
HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "shardingKey-" + 
record.getPosition().getPartition()), new 
HoodieAvroPayload(Option.of(genericRecord)));
+            hoodieRecordsList.add(hoodieRecord);
+        }
+
         try {
             log.info("Before commit.");
             List<HoodieRecord> hoodieRecordsList = new ArrayList<>();
diff --git a/connectors/rocketmq-connect-hudi/src/test/java/Producer.java 
b/connectors/rocketmq-connect-hudi/src/test/java/Producer.java
index 448dfe6..16d51da 100644
--- a/connectors/rocketmq-connect-hudi/src/test/java/Producer.java
+++ b/connectors/rocketmq-connect-hudi/src/test/java/Producer.java
@@ -36,7 +36,7 @@ public class Producer {
         DefaultMQProducer producer = new 
DefaultMQProducer("ProducerGroupName");
         producer.start();
 
-        File s = new File("/Users/osgoo/Downloads/user.avsc");
+        File s = new File("user.avsc");
         Schema schema = new Schema.Parser().parse(s);
 
         GenericRecord user1 = new GenericData.Record(schema);
diff --git a/connectors/rocketmq-connect-hudi/src/test/java/user.avsc 
b/connectors/rocketmq-connect-hudi/src/test/java/user.avsc
new file mode 100644
index 0000000..117ea70
--- /dev/null
+++ b/connectors/rocketmq-connect-hudi/src/test/java/user.avsc
@@ -0,0 +1,9 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}

Reply via email to