odbozhou commented on code in PR #313:
URL: https://github.com/apache/rocketmq-connect/pull/313#discussion_r977234058


##########
connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java:
##########
@@ -17,118 +17,83 @@
 
 package org.apache.connect.mongo.connector.builder;
 
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
-import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.bson.BsonTimestamp;
 
-import static org.apache.connect.mongo.replicator.Constants.CREATED;
-import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
-import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
-import static org.apache.connect.mongo.replicator.Constants.PATCH;
-import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
-import static org.apache.connect.mongo.replicator.Constants.VERSION;
-
 public class MongoDataEntry {
 
-    private static String SCHEMA_CREATED_NAME = "mongo_created";
-    private static String SCHEMA_OPLOG_NAME = "mongo_oplog";
-
-    public static SourceDataEntry createSouceDataEntry(ReplicationEvent event, 
ReplicaSetConfig replicaSetConfig) {
-
-        DataEntryBuilder dataEntryBuilder;
-
+    public static ConnectRecord createSourceDataEntry(ReplicationEvent event, 
ReplicaSetConfig replicaSetConfig) {
+        final Position position = replicaSetConfig.getPosition();
+        final int oldTimestamp = position.getTimeStamp();
+        final BsonTimestamp timestamp = event.getTimestamp();
+        if (oldTimestamp != 0 && timestamp != null &&  timestamp.getTime() <= 
oldTimestamp) {
+            return null;
+        }
+        Schema schema;
         if (event.getOperationType().equals(OperationType.CREATED)) {
-            Schema schema = 
createdSchema(replicaSetConfig.getReplicaSetName());
-            dataEntryBuilder = new DataEntryBuilder(schema);
-            dataEntryBuilder.timestamp(System.currentTimeMillis())
-                .queue(event.getNamespace().replace(".", "-").replace("$", 
"-"))
-                .entryType(event.getEntryType());
-
-            dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
-            dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
-
+            schema = 
SchemaBuilder.struct().name(Constants.SCHEMA_CREATED_NAME).build();
         } else {
-            Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName());
-            dataEntryBuilder = new DataEntryBuilder(schema);
-            dataEntryBuilder.timestamp(System.currentTimeMillis())
-                .queue(event.getNamespace().replace(".", "-").replace("$", 
"-"))
-                .entryType(event.getEntryType());
-            dataEntryBuilder.putFiled(OPERATION_TYPE, 
event.getOperationType().name());
-            dataEntryBuilder.putFiled(TIMESTAMP, 
event.getTimestamp().getValue());
-            dataEntryBuilder.putFiled(VERSION, event.getV());
-            dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
-            dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() 
? JSONObject.toJSONString(event.getEventData().get()) : "");
-            dataEntryBuilder.putFiled(OBJECT_ID, 
event.getObjectId().isPresent() ? 
JSONObject.toJSONString(event.getObjectId().get()) : "");
+            schema = 
SchemaBuilder.struct().name(Constants.SCHEMA_OPLOG_NAME).build();
         }
-
-        String position = createPosition(event, replicaSetConfig);
-        SourceDataEntry sourceDataEntry = 
dataEntryBuilder.buildSourceDataEntry(
-            
ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)),
-            ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8)));
-        return sourceDataEntry;
-    }
-
-    private static String createPosition(ReplicationEvent event, 
ReplicaSetConfig replicaSetConfig) {
-        Position position = new Position();
-        BsonTimestamp timestamp = event.getTimestamp();
-        position.setInc(timestamp != null ? timestamp.getInc() : 0);
-        position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0);
-        
position.setInitSync(event.getOperationType().equals(OperationType.CREATED) ? 
true : false);
-        return JSONObject.toJSONString(position);
-
+        final List<Field> fields = buildFields();
+        schema.setFields(fields);
+        return new ConnectRecord(buildRecordPartition(replicaSetConfig),
+            buildRecordOffset(event, replicaSetConfig),
+            System.currentTimeMillis(),
+            schema,
+            buildPayLoad(fields, event, schema));
     }
 
-    private static Schema createdSchema(String dataSourceName) {
-        Schema schema = new Schema();
-        schema.setDataSource(dataSourceName);
-        schema.setName(SCHEMA_CREATED_NAME);
-        schema.setFields(new ArrayList<>());
-        createdField(schema);
-        return schema;
+    private static RecordPartition buildRecordPartition(ReplicaSetConfig 
replicaSetConfig) {
+        Map<String, String> partitionMap = new HashMap<>();
+        partitionMap.put(Constants.REPLICA_SET_NAME, 
replicaSetConfig.getReplicaSetName());
+        RecordPartition  recordPartition = new RecordPartition(partitionMap);
+        return recordPartition;
     }
 
-    private static Schema oplogSchema(String dataSourceName) {
-        Schema schema = new Schema();
-        schema.setDataSource(dataSourceName);
-        schema.setName(SCHEMA_OPLOG_NAME);
-        oplogField(schema);
-        return schema;
+    private static RecordOffset buildRecordOffset(ReplicationEvent event, 
ReplicaSetConfig config)  {
+        Map<String, Integer> offsetMap = new HashMap<>();
+        final Position position = config.getPosition();
+        offsetMap.put(Constants.TIMESTAMP, event.getTimestamp() != null ? 
event.getTimestamp().getTime() : position.getTimeStamp());
+        RecordOffset recordOffset = new RecordOffset(offsetMap);
+        return recordOffset;
     }
 
-    private static void createdField(Schema schema) {
-        Field namespace = new Field(0, NAMESPACE, FieldType.STRING);
-        schema.getFields().add(namespace);
-        Field operation = new Field(1, Constants.CREATED, FieldType.STRING);
-        schema.getFields().add(operation);
+    private static List<Field> buildFields() {
+        final Schema stringSchema = SchemaBuilder.string().build();

Review Comment:
   The mongodb data type should not only support string, whether there should 
be a mongodb type converted to a connect schema type



##########
connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java:
##########
@@ -17,118 +17,83 @@
 
 package org.apache.connect.mongo.connector.builder;
 
-import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connector.api.data.DataEntryBuilder;
+import com.alibaba.fastjson.JSON;
+import io.openmessaging.connector.api.data.ConnectRecord;
 import io.openmessaging.connector.api.data.Field;
-import io.openmessaging.connector.api.data.FieldType;
+import io.openmessaging.connector.api.data.RecordOffset;
+import io.openmessaging.connector.api.data.RecordPartition;
 import io.openmessaging.connector.api.data.Schema;
-import io.openmessaging.connector.api.data.SourceDataEntry;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
+import io.openmessaging.connector.api.data.SchemaBuilder;
+import io.openmessaging.connector.api.data.Struct;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.connect.mongo.replicator.Constants;
 import org.apache.connect.mongo.replicator.Position;
 import org.apache.connect.mongo.replicator.ReplicaSetConfig;
 import org.apache.connect.mongo.replicator.event.OperationType;
 import org.apache.connect.mongo.replicator.event.ReplicationEvent;
 import org.bson.BsonTimestamp;
 
-import static org.apache.connect.mongo.replicator.Constants.CREATED;
-import static org.apache.connect.mongo.replicator.Constants.NAMESPACE;
-import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID;
-import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE;
-import static org.apache.connect.mongo.replicator.Constants.PATCH;
-import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP;
-import static org.apache.connect.mongo.replicator.Constants.VERSION;
-
 public class MongoDataEntry {
 
-    private static String SCHEMA_CREATED_NAME = "mongo_created";
-    private static String SCHEMA_OPLOG_NAME = "mongo_oplog";
-
-    public static SourceDataEntry createSouceDataEntry(ReplicationEvent event, 
ReplicaSetConfig replicaSetConfig) {
-
-        DataEntryBuilder dataEntryBuilder;
-
+    public static ConnectRecord createSourceDataEntry(ReplicationEvent event, 
ReplicaSetConfig replicaSetConfig) {
+        final Position position = replicaSetConfig.getPosition();
+        final int oldTimestamp = position.getTimeStamp();
+        final BsonTimestamp timestamp = event.getTimestamp();
+        if (oldTimestamp != 0 && timestamp != null &&  timestamp.getTime() <= 
oldTimestamp) {
+            return null;
+        }
+        Schema schema;
         if (event.getOperationType().equals(OperationType.CREATED)) {
-            Schema schema = 
createdSchema(replicaSetConfig.getReplicaSetName());
-            dataEntryBuilder = new DataEntryBuilder(schema);
-            dataEntryBuilder.timestamp(System.currentTimeMillis())
-                .queue(event.getNamespace().replace(".", "-").replace("$", 
"-"))
-                .entryType(event.getEntryType());
-
-            dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson());
-            dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace());
-
+            schema = 
SchemaBuilder.struct().name(Constants.SCHEMA_CREATED_NAME).build();

Review Comment:
   schemaThe schema should express the fields and types of the value, not the 
operation type of the data



##########
connectors/rocketmq-connect-mongo/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java:
##########
@@ -32,45 +32,32 @@ public class MongoSourceConnector extends SourceConnector {
     private KeyValue keyValueConfig;
 
     @Override
-    public String verifyAndSetConfig(KeyValue config) {
+    public void start(KeyValue config) {
         for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) {
             if (!config.containsKey(requestKey)) {
-                return "Request config key: " + requestKey;
+                throw new RuntimeException("Request config key: " + 
requestKey);
             }
         }
         this.keyValueConfig = config;
-        return "";
-    }
-
-    @Override
-    public void start() {
-        logger.info("start mongo source connector:{}", keyValueConfig);
     }
 
     @Override
     public void stop() {
-
+        this.keyValueConfig = null;
     }
 
-    @Override
-    public void pause() {
 
-    }
-
-    @Override
-    public void resume() {
 
+    @Override public List<KeyValue> taskConfigs(int maxTasks) {

Review Comment:
   mongodb can support replica, whether it can be divided into multiple tasks 
according to the maximum number of tasks allocated and configured, which can 
improve the parallelism of Connector



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to