Oliverwqcwrw commented on code in PR #313:
URL: https://github.com/apache/rocketmq-connect/pull/313#discussion_r979233946
##########
connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java:
##########
@@ -17,118 +17,83 @@
package org.apache.connect.mongo.connector.builder;
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
import io.openmessaging.connector.api.data.Field;
-import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.connect.mongo.replicator.Constants;
import org.apache.connect.mongo.replicator.Position;
import org.apache.connect.mongo.replicator.ReplicaSetConfig;
import org.apache.connect.mongo.replicator.event.OperationType;
import org.apache.connect.mongo.replicator.event.ReplicationEvent;
import org.bson.BsonTimestamp;
-import static org.apache.connect.mongo.replicator.Constants.CREATED;
-import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
-import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
-import static org.apache.connect.mongo.replicator.Constants.PATCH;
-import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
-import static org.apache.connect.mongo.replicator.Constants.VERSION;
-
public class MongoDataEntry {
- private static String SCHEMA_CREATED_NAME = "mongo_created";
- private static String SCHEMA_OPLOG_NAME = "mongo_oplog";
-
- public static SourceDataEntry createSouceDataEntry(ReplicationEvent event,
ReplicaSetConfig replicaSetConfig) {
-
- DataEntryBuilder dataEntryBuilder;
-
+ public static ConnectRecord createSourceDataEntry(ReplicationEvent event,
ReplicaSetConfig replicaSetConfig) {
+ final Position position = replicaSetConfig.getPosition();
+ final int oldTimestamp = position.getTimeStamp();
+ final BsonTimestamp timestamp = event.getTimestamp();
+ if (oldTimestamp != 0 && timestamp != null && timestamp.getTime() <=
oldTimestamp) {
+ return null;
+ }
+ Schema schema;
if (event.getOperationType().equals(OperationType.CREATED)) {
- Schema schema =
createdSchema(replicaSetConfig.getReplicaSetName());
- dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-").replace("$",
"-"))
- .entryType(event.getEntryType());
-
- dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
- dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
-
+ schema =
SchemaBuilder.struct().name(Constants.SCHEMA_CREATED_NAME).build();
} else {
- Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName());
- dataEntryBuilder = new DataEntryBuilder(schema);
- dataEntryBuilder.timestamp(System.currentTimeMillis())
- .queue(event.getNamespace().replace(".", "-").replace("$",
"-"))
- .entryType(event.getEntryType());
- dataEntryBuilder.putFiled(OPERATION_TYPE,
event.getOperationType().name());
- dataEntryBuilder.putFiled(TIMESTAMP,
event.getTimestamp().getValue());
- dataEntryBuilder.putFiled(VERSION, event.getV());
- dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
- dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent()
? JSONObject.toJSONString(event.getEventData().get()) : "");
- dataEntryBuilder.putFiled(OBJECT_ID,
event.getObjectId().isPresent() ?
JSONObject.toJSONString(event.getObjectId().get()) : "");
+ schema =
SchemaBuilder.struct().name(Constants.SCHEMA_OPLOG_NAME).build();
}
-
- String position = createPosition(event, replicaSetConfig);
- SourceDataEntry sourceDataEntry =
dataEntryBuilder.buildSourceDataEntry(
-
ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
- ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
- return sourceDataEntry;
- }
-
- private static String createPosition(ReplicationEvent event,
ReplicaSetConfig replicaSetConfig) {
- Position position = new Position();
- BsonTimestamp timestamp = event.getTimestamp();
- position.setInc(timestamp != null ? timestamp.getInc() : 0);
- position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0);
-
position.setInitSync(event.getOperationType().equals(OperationType.CREATED) ?
true : false);
- return JSONObject.toJSONString(position);
-
+ final List<Field> fields = buildFields();
+ schema.setFields(fields);
+ return new ConnectRecord(buildRecordPartition(replicaSetConfig),
+ buildRecordOffset(event, replicaSetConfig),
+ System.currentTimeMillis(),
+ schema,
+ buildPayLoad(fields, event, schema));
}
- private static Schema createdSchema(String dataSourceName) {
- Schema schema = new Schema();
- schema.setDataSource(dataSourceName);
- schema.setName(SCHEMA_CREATED_NAME);
- schema.setFields(new ArrayList<>());
- createdField(schema);
- return schema;
+ private static RecordPartition buildRecordPartition(ReplicaSetConfig
replicaSetConfig) {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put(Constants.REPLICA_SET_NAME,
replicaSetConfig.getReplicaSetName());
+ RecordPartition recordPartition = new RecordPartition(partitionMap);
+ return recordPartition;
}
- private static Schema oplogSchema(String dataSourceName) {
- Schema schema = new Schema();
- schema.setDataSource(dataSourceName);
- schema.setName(SCHEMA_OPLOG_NAME);
- oplogField(schema);
- return schema;
+ private static RecordOffset buildRecordOffset(ReplicationEvent event,
ReplicaSetConfig config) {
+ Map<String, Integer> offsetMap = new HashMap<>();
+ final Position position = config.getPosition();
+ offsetMap.put(Constants.TIMESTAMP, event.getTimestamp() != null ?
event.getTimestamp().getTime() : position.getTimeStamp());
+ RecordOffset recordOffset = new RecordOffset(offsetMap);
+ return recordOffset;
}
- private static void createdField(Schema schema) {
- Field namespace = new Field(0, NAMESPACE, FieldType.STRING);
- schema.getFields().add(namespace);
- Field operation = new Field(1, Constants.CREATED, FieldType.STRING);
- schema.getFields().add(operation);
+ private static List<Field> buildFields() {
+ final Schema stringSchema = SchemaBuilder.string().build();
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]