odbozhou commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r965525651


##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/connector/RedisSourceTask.java:
##########
@@ -94,11 +89,7 @@ public Config getConfig() {
         this.config.load(keyValue);
         LOGGER.info("task config msg: {}", this.config.toString());
 
-        // get position info
-        ByteBuffer byteBuffer = 
this.context.positionStorageReader().getPosition(
-            this.config.getPositionPartitionKey()
-        );
-        Long position = RedisPositionConverter.jsonToLong(byteBuffer);
+        final Long position = 
this.sourceTaskContext.configs().getLong("offset");

Review Comment:
   To get the position, you can refer to FileSourceTask



##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/converter/RedisEntryConverter.java:
##########
@@ -17,53 +17,89 @@
 
 package org.apache.rocketmq.connect.redis.converter;
 
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
 import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.connect.redis.common.Options;
-import org.apache.rocketmq.connect.redis.converter.KVEntryConverter;
 import org.apache.rocketmq.connect.redis.pojo.KVEntry;
 
 public class RedisEntryConverter implements KVEntryConverter {
     private final int maxValueSize = 500;
 
-    @Override public List<SourceDataEntry> kVEntryToDataEntries(KVEntry 
kvEntry) {
-        Schema schema = getRedisSchema(kvEntry.getValueType());
-
+    @Override public List<ConnectRecord> kVEntryToDataEntries(KVEntry kvEntry) 
{
         String partition = kvEntry.getPartition();
         if (partition == null) {
             throw new IllegalStateException("partition info error.");
         }
-        List<SourceDataEntry> res = new ArrayList<>();
+        List<ConnectRecord> res = new ArrayList<>();
         List<Object> values = splitValue(kvEntry.getValueType(), 
kvEntry.getValue(), this.maxValueSize);
         for (int i = 0; i < values.size(); i++) {
-            DataEntryBuilder builder = newDataEntryBuilderWithoutValue(schema, 
kvEntry);
-
-            builder.putFiled(Options.REDIS_VALUE.name(), values.get(i));
-            builder.timestamp(System.currentTimeMillis());
+            Schema schema = 
SchemaBuilder.struct().name(Options.REDIS_QEUEUE.name()).build();
+            final List<Field> fields = buildFields();
+            schema.setFields(fields);
 
-            SourceDataEntry entry = builder.buildSourceDataEntry(
-                ByteBuffer.wrap(kvEntry.getPartition().getBytes()),
-                
ByteBuffer.wrap(RedisPositionConverter.longToJson(kvEntry.getOffset()).toJSONString().getBytes())
-            );
-            res.add(entry);
+            final Object data = values.get(i);
+            if (data == null  || data.toString().equals("")) {
+                continue;
+            }
+            res.add(new ConnectRecord(buildRecordPartition(partition),
+                buildRecordOffset(kvEntry.getOffset()),
+                System.currentTimeMillis(),
+                schema,
+                buildPayLoad(fields, kvEntry, schema)));
         }
         return res;
     }
 
+    private RecordOffset buildRecordOffset(Long offset)  {
+        Map<String, Long> offsetMap = new HashMap<>();
+        offsetMap.put("queueOffset", offset);
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
+    }
+
+    private RecordPartition buildRecordPartition(String partition) {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put("partition", partition);
+        RecordPartition  recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
+    }
+
+    private List<Field> buildFields() {
+        final Schema stringSchema = SchemaBuilder.string().build();
+        List<Field> fields = new ArrayList<>();
+        fields.add(new Field(0, Options.REDIS_COMMAND.name(), stringSchema));
+        fields.add(new Field(1, Options.REDIS_KEY.name(), stringSchema));

Review Comment:
   There is already a key attribute in ConnectRecord. Should the redis key be 
placed in the key attribute in ConnectRecord, and the value should be assigned 
to the data in ConnectRecord?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to