odbozhou commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r971570840


##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/converter/RedisEntryConverter.java:
##########
@@ -17,53 +17,81 @@
 
 package org.apache.rocketmq.connect.redis.converter;
 
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
+import io.openmessaging.connector.api.data.SchemaBuilder;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.connect.redis.common.Options;
-import org.apache.rocketmq.connect.redis.converter.KVEntryConverter;
 import org.apache.rocketmq.connect.redis.pojo.KVEntry;
 
 public class RedisEntryConverter implements KVEntryConverter {
     private final int maxValueSize = 500;
 
-    @Override public List<SourceDataEntry> kVEntryToDataEntries(KVEntry 
kvEntry) {
-        Schema schema = getRedisSchema(kvEntry.getValueType());
-
+    @Override
+    public List<ConnectRecord> kVEntryToConnectRecord(KVEntry kvEntry) {
         String partition = kvEntry.getPartition();
         if (partition == null) {
             throw new IllegalStateException("partition info error.");
         }
-        List<SourceDataEntry> res = new ArrayList<>();
+
+        List<ConnectRecord> res = new ArrayList<>();
         List<Object> values = splitValue(kvEntry.getValueType(), 
kvEntry.getValue(), this.maxValueSize);
         for (int i = 0; i < values.size(); i++) {
-            DataEntryBuilder builder = newDataEntryBuilderWithoutValue(schema, 
kvEntry);
-
-            builder.putFiled(Options.REDIS_VALUE.name(), values.get(i));
-            builder.timestamp(System.currentTimeMillis());
-
-            SourceDataEntry entry = builder.buildSourceDataEntry(
-                ByteBuffer.wrap(kvEntry.getPartition().getBytes()),
-                
ByteBuffer.wrap(RedisPositionConverter.longToJson(kvEntry.getOffset()).toJSONString().getBytes())
-            );
-            res.add(entry);
+            Schema keySchema = 
SchemaBuilder.string().name(Options.REDIS_KEY.name()).build();
+            keySchema.setFields(buildFields());
+            final Object data = values.get(i);
+            if (data == null  || data.toString().equals("")) {
+                continue;
+            }
+            res.add(new ConnectRecord(
+                buildRecordPartition(partition),
+                buildRecordOffset(kvEntry.getOffset()),
+                System.currentTimeMillis(),
+                keySchema,
+                kvEntry.getKey(),
+                null,

Review Comment:
   Is the value schema missing?



##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java:
##########
@@ -94,11 +107,7 @@ public Config getConfig() {
         this.config.load(keyValue);
         LOGGER.info("task config msg: {}", this.config.toString());
 
-        // get position info
-        ByteBuffer byteBuffer = 
this.context.positionStorageReader().getPosition(
-            this.config.getPositionPartitionKey()
-        );
-        Long position = RedisPositionConverter.jsonToLong(byteBuffer);
+        final Long position = 
this.sourceTaskContext.configs().getLong("offset");

Review Comment:
   The acquisition and use of position seems to be a bit problematic. The main 
function of position is to record the processing site of the connector, which 
ensures that when restarting or re-load balancing, it can continue processing 
where the previous processing site was successful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to