Oliverwqcwrw commented on code in PR #289:
URL: https://github.com/apache/rocketmq-connect/pull/289#discussion_r966487465
##########
connectors/rocketmq-connect-redis/src/main/java/org/apache/rocketmq/connect/redis/converter/RedisEntryConverter.java:
##########
@@ -17,53 +17,89 @@
package org.apache.rocketmq.connect.redis.converter;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.connect.redis.common.Options;
-import org.apache.rocketmq.connect.redis.converter.KVEntryConverter;
import org.apache.rocketmq.connect.redis.pojo.KVEntry;
public class RedisEntryConverter implements KVEntryConverter {
private final int maxValueSize = 500;
- @Override public List<SourceDataEntry> kVEntryToDataEntries(KVEntry
kvEntry) {
- Schema schema = getRedisSchema(kvEntry.getValueType());
-
+ @Override public List<ConnectRecord> kVEntryToDataEntries(KVEntry kvEntry)
{
String partition = kvEntry.getPartition();
if (partition == null) {
throw new IllegalStateException("partition info error.");
}
- List<SourceDataEntry> res = new ArrayList<>();
+ List<ConnectRecord> res = new ArrayList<>();
List<Object> values = splitValue(kvEntry.getValueType(),
kvEntry.getValue(), this.maxValueSize);
for (int i = 0; i < values.size(); i++) {
- DataEntryBuilder builder = newDataEntryBuilderWithoutValue(schema,
kvEntry);
-
- builder.putFiled(Options.REDIS_VALUE.name(), values.get(i));
- builder.timestamp(System.currentTimeMillis());
+ Schema schema =
SchemaBuilder.struct().name(Options.REDIS_QEUEUE.name()).build();
+ final List<Field> fields = buildFields();
+ schema.setFields(fields);
- SourceDataEntry entry = builder.buildSourceDataEntry(
- ByteBuffer.wrap(kvEntry.getPartition().getBytes()),
-
ByteBuffer.wrap(RedisPositionConverter.longToJson(kvEntry.getOffset()).toJSONString().getBytes())
- );
- res.add(entry);
+ final Object data = values.get(i);
+ if (data == null || data.toString().equals("")) {
+ continue;
+ }
+ res.add(new ConnectRecord(buildRecordPartition(partition),
+ buildRecordOffset(kvEntry.getOffset()),
+ System.currentTimeMillis(),
+ schema,
+ buildPayLoad(fields, kvEntry, schema)));
}
return res;
}
+ private RecordOffset buildRecordOffset(Long offset) {
+ Map<String, Long> offsetMap = new HashMap<>();
+ offsetMap.put("queueOffset", offset);
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
+ }
+
+ private RecordPartition buildRecordPartition(String partition) {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put("partition", partition);
+ RecordPartition recordPartition = new RecordPartition(partitionMap);
+ return recordPartition;
+ }
+
+ private List<Field> buildFields() {
+ final Schema stringSchema = SchemaBuilder.string().build();
+ List<Field> fields = new ArrayList<>();
+ fields.add(new Field(0, Options.REDIS_COMMAND.name(), stringSchema));
+ fields.add(new Field(1, Options.REDIS_KEY.name(), stringSchema));
Review Comment:
Ok, i will fix it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]