This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new c5d9788 Adapt hudi to openmessaging-connector 0.1.4 (#267)
c5d9788 is described below
commit c5d97886f73dc321c8cf98d9b14c7370a7815174
Author: lizhiboo <[email protected]>
AuthorDate: Wed Aug 24 17:27:49 2022 +0800
Adapt hudi to openmessaging-connector 0.1.4 (#267)
* adapt to new api
* fulfill run hudi-connector locally demo doc
* adapt to 0.1.4
* reset connect-standalone.conf
* add schema file
---
.../connect/hudi/connector/HudiSinkTask.java | 12 ++++++++++--
.../apache/rocketmq/connect/hudi/sink/Updater.java | 22 ++++++++++++++++++++--
.../src/test/java/Producer.java | 2 +-
.../rocketmq-connect-hudi/src/test/java/user.avsc | 9 +++++++++
4 files changed, 40 insertions(+), 5 deletions(-)
diff --git
a/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
b/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
index 915c95b..112f530 100644
---
a/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
+++
b/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/connector/HudiSinkTask.java
@@ -20,6 +20,8 @@ package org.apache.rocketmq.connect.hudi.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.component.task.sink.SinkTask;
import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.hudi.config.HudiConnectConfig;
import org.apache.rocketmq.connect.hudi.config.ConfigUtil;
@@ -28,6 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+
import java.util.List;
import java.util.Map;
@@ -95,9 +98,14 @@ public class HudiSinkTask extends SinkTask {
}
}
-
@Override
- public void validate(KeyValue config) {
+ public void flush(Map<RecordPartition, RecordOffset> currentOffsets)
throws ConnectException {
+ }
+ @Override
+ public Map<RecordPartition, RecordOffset> preCommit(Map<RecordPartition,
RecordOffset> currentOffsets) {
+ this.flush(currentOffsets);
+ return currentOffsets;
}
+
}
diff --git
a/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
b/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
index e0ea311..4258767 100644
---
a/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
+++
b/connectors/rocketmq-connect-hudi/src/main/java/org/apache/rocketmq/connect/hudi/sink/Updater.java
@@ -140,8 +140,12 @@ public class Updater {
}
}
- private GenericRecord sinkDataEntry2GenericRecord(ConnectRecord record) {
- byte[] recordBytes = new byte[0];
+
+ private GenericRecord sinkDataEntry2GenericRecord(ConnectRecord record)
throws UnsupportedEncodingException {
+ byte[] recordBytes = ((String) record.getData()).getBytes("UTF8");
+ GenericRecord genericRecord = new
GenericData.Record(this.hudiConnectConfig.schema);
+ DatumReader<GenericRecord> userDatumReader = new
SpecificDatumReader<GenericRecord>(this.hudiConnectConfig.schema);
+ BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(recordBytes, null);
try {
recordBytes = ((String) record.getData()).getBytes("UTF8");
} catch (UnsupportedEncodingException e) {
@@ -210,6 +214,20 @@ public class Updater {
commitList = inflightList;
inflightList = new ArrayList<>();
}
+
+ List<HoodieRecord> hoodieRecordsList = new ArrayList<>();
+ for (ConnectRecord record : commitList) {
+ GenericRecord genericRecord = null;
+ try {
+ genericRecord = sinkDataEntry2GenericRecord(record);
+ } catch (UnsupportedEncodingException e) {
+ log.error("parse record error, ", e);
+ continue;
+ }
+ HoodieRecord<HoodieAvroPayload> hoodieRecord = new
HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "shardingKey-" +
record.getPosition().getPartition()), new
HoodieAvroPayload(Option.of(genericRecord)));
+ hoodieRecordsList.add(hoodieRecord);
+ }
+
try {
log.info("Before commit.");
List<HoodieRecord> hoodieRecordsList = new ArrayList<>();
diff --git a/connectors/rocketmq-connect-hudi/src/test/java/Producer.java
b/connectors/rocketmq-connect-hudi/src/test/java/Producer.java
index 448dfe6..16d51da 100644
--- a/connectors/rocketmq-connect-hudi/src/test/java/Producer.java
+++ b/connectors/rocketmq-connect-hudi/src/test/java/Producer.java
@@ -36,7 +36,7 @@ public class Producer {
DefaultMQProducer producer = new
DefaultMQProducer("ProducerGroupName");
producer.start();
- File s = new File("/Users/osgoo/Downloads/user.avsc");
+ File s = new File("user.avsc");
Schema schema = new Schema.Parser().parse(s);
GenericRecord user1 = new GenericData.Record(schema);
diff --git a/connectors/rocketmq-connect-hudi/src/test/java/user.avsc
b/connectors/rocketmq-connect-hudi/src/test/java/user.avsc
new file mode 100644
index 0000000..117ea70
--- /dev/null
+++ b/connectors/rocketmq-connect-hudi/src/test/java/user.avsc
@@ -0,0 +1,9 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}