joeCarf commented on code in PR #514:
URL: https://github.com/apache/rocketmq-connect/pull/514#discussion_r1262120393


##########
connectors/rocketmq-connect-hologres/.gitignore:
##########


Review Comment:
   gitignore already exists, is this necessary?



##########
connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java:
##########
@@ -0,0 +1,181 @@
+package org.apache.rocketmq.connect.hologres.connector;
+
+import com.alibaba.hologres.client.BinlogShardGroupReader;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.HoloConfig;
+import com.alibaba.hologres.client.Subscribe;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.alibaba.hologres.client.model.Column;
+import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
+import com.alibaba.hologres.client.model.binlog.BinlogRecord;
+import com.alibaba.hologres.com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.apache.rocketmq.connect.hologres.config.HologresSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INFO_KEY;
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INDEX_KEY;
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.HOLOGRES_POSITION;
+
+
+public class HologresSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(HologresSourceTask.class);
+
+    private KeyValue keyValue;
+    private HologresSourceConfig sourceConfig;
+    private HoloConfig holoClientConfig;
+    private HoloClient holoClient;
+    private BinlogShardGroupReader reader;
+    private long count = 0;
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        List<ConnectRecord> records = new ArrayList<>();
+        try {
+            BinlogRecord record = reader.getBinlogRecord();
+
+            if (record instanceof BinlogHeartBeatRecord) {
+                return null;
+            }
+
+            if (++count % 1000 == 0) {

Review Comment:
   或许count需要reset一下,避免溢出



##########
connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md:
##########
@@ -0,0 +1,89 @@
+**rocketmq-connect-hologres**
+
+在启动runtime之后,通过发送http消息到runtime,携带connector和task的参数,启动connector
+
+**参数说明**
+
+- **connector-class**: connector的类名
+- **tasks.num**: 启动的task数目
+
+##### parameter configuration

Review Comment:
   感觉可以标注一下是source还是sink。比如使用三级标题



##########
connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java:
##########
@@ -0,0 +1,181 @@
+package org.apache.rocketmq.connect.hologres.connector;
+
+import com.alibaba.hologres.client.BinlogShardGroupReader;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.HoloConfig;
+import com.alibaba.hologres.client.Subscribe;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.alibaba.hologres.client.model.Column;
+import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
+import com.alibaba.hologres.client.model.binlog.BinlogRecord;
+import com.alibaba.hologres.com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.apache.rocketmq.connect.hologres.config.HologresSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INFO_KEY;
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INDEX_KEY;
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.HOLOGRES_POSITION;
+
+
+public class HologresSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(HologresSourceTask.class);
+
+    private KeyValue keyValue;
+    private HologresSourceConfig sourceConfig;
+    private HoloConfig holoClientConfig;
+    private HoloClient holoClient;
+    private BinlogShardGroupReader reader;
+    private long count = 0;
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        List<ConnectRecord> records = new ArrayList<>();
+        try {
+            BinlogRecord record = reader.getBinlogRecord();
+
+            if (record instanceof BinlogHeartBeatRecord) {
+                return null;
+            }
+
+            if (++count % 1000 == 0) {
+                reader.commit(sourceConfig.getBinlogCommitTimeIntervalMs());
+            }
+
+            records.add(hologresRecord2ConnectRecord(record));
+        } catch (Exception e) {
+            log.error("Error while polling data from Hologres", e);
+        }
+        return records;

Review Comment:
   每次poll只拉一个record下来吗



##########
connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java:
##########
@@ -0,0 +1,181 @@
+package org.apache.rocketmq.connect.hologres.connector;
+
+import com.alibaba.hologres.client.BinlogShardGroupReader;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.HoloConfig;
+import com.alibaba.hologres.client.Subscribe;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.alibaba.hologres.client.model.Column;
+import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
+import com.alibaba.hologres.client.model.binlog.BinlogRecord;
+import com.alibaba.hologres.com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.apache.rocketmq.connect.hologres.config.HologresSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INFO_KEY;
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INDEX_KEY;
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.HOLOGRES_POSITION;
+
+
+public class HologresSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(HologresSourceTask.class);
+
+    private KeyValue keyValue;
+    private HologresSourceConfig sourceConfig;
+    private HoloConfig holoClientConfig;
+    private HoloClient holoClient;
+    private BinlogShardGroupReader reader;
+    private long count = 0;
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        List<ConnectRecord> records = new ArrayList<>();
+        try {
+            BinlogRecord record = reader.getBinlogRecord();
+
+            if (record instanceof BinlogHeartBeatRecord) {
+                return null;
+            }
+
+            if (++count % 1000 == 0) {

Review Comment:
   或许count需要reset一下,避免溢出



##########
connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/connector/HologresSourceTask.java:
##########
@@ -0,0 +1,181 @@
+package org.apache.rocketmq.connect.hologres.connector;
+
+import com.alibaba.hologres.client.BinlogShardGroupReader;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.HoloConfig;
+import com.alibaba.hologres.client.Subscribe;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.alibaba.hologres.client.model.Column;
+import com.alibaba.hologres.client.model.binlog.BinlogHeartBeatRecord;
+import com.alibaba.hologres.client.model.binlog.BinlogRecord;
+import com.alibaba.hologres.com.google.common.base.Strings;
+import io.openmessaging.KeyValue;
+import io.openmessaging.connector.api.component.task.source.SourceTask;
+import io.openmessaging.connector.api.data.ConnectRecord;
+import io.openmessaging.connector.api.data.Field;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
+import io.openmessaging.connector.api.data.Schema;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
+import org.apache.rocketmq.connect.hologres.config.HologresSourceConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INFO_KEY;
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.PARTITION_INDEX_KEY;
+import static 
org.apache.rocketmq.connect.hologres.config.HologresConstant.HOLOGRES_POSITION;
+
+
+public class HologresSourceTask extends SourceTask {
+    private static final Logger log = 
LoggerFactory.getLogger(HologresSourceTask.class);
+
+    private KeyValue keyValue;
+    private HologresSourceConfig sourceConfig;
+    private HoloConfig holoClientConfig;
+    private HoloClient holoClient;
+    private BinlogShardGroupReader reader;
+    private long count = 0;
+
+    @Override
+    public List<ConnectRecord> poll() throws InterruptedException {
+        List<ConnectRecord> records = new ArrayList<>();
+        try {
+            BinlogRecord record = reader.getBinlogRecord();
+
+            if (record instanceof BinlogHeartBeatRecord) {
+                return null;
+            }
+
+            if (++count % 1000 == 0) {
+                reader.commit(sourceConfig.getBinlogCommitTimeIntervalMs());
+            }
+
+            records.add(hologresRecord2ConnectRecord(record));
+        } catch (Exception e) {
+            log.error("Error while polling data from Hologres", e);
+        }
+        return records;

Review Comment:
   每次poll只拉一个record下来吗



##########
connectors/rocketmq-connect-hologres/src/main/java/org/apache/rocketmq/connect/hologres/README.md:
##########
@@ -0,0 +1,89 @@
+**rocketmq-connect-hologres**
+
+在启动runtime之后,通过发送http消息到runtime,携带connector和task的参数,启动connector
+
+**参数说明**
+
+- **connector-class**: connector的类名
+- **tasks.num**: 启动的task数目
+
+##### parameter configuration

Review Comment:
   感觉可以标注一下是source还是sink。比如使用三级标题



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to