This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 6ca400a1e37743bf0a701e49bd5b3b28efeba9b7
Author: 李平 <[email protected]>
AuthorDate: Mon Aug 5 15:02:53 2019 +0800

    add junit test and modify some code
---
 .../apache/connect/mongo/replicator/Constants.java |  33 +++++
 .../replicator/event/DocumentConvertEvent.java     |  35 +++++
 .../mongo/replicator/event/OperationType.java      |  28 ++++
 .../mongo/replicator/event/ReplicationEvent.java   | 153 +++++++++++++++++++++
 4 files changed, 249 insertions(+)

diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java 
b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
new file mode 100644
index 0000000..668fd91
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java
@@ -0,0 +1,33 @@
+package org.apache.connect.mongo.replicator;
+
+public class Constants {
+
+
+    public static final String APPLICATION_NAME = "java-mongo-replicator";
+
+
+    public static final String MONGO_LOCAL_DATABASE = "local";
+    public static final String MONGO_OPLOG_RS = "oplog.rs";
+
+
+
+    public static final String OPERATIONTYPE = "op";
+    public static final String TIMESTAMP = "ts";
+    public static final String VERSION = "v";
+    public static final String HASH = "h";
+    public static final String NAMESPACE = "ns";
+    public static final String OPERATION = "o";
+    public static final String OBJECTID = "o2";
+
+
+    public static final String CREATED = "created";
+    public static final String PATCH = "patch";
+
+    public static final String INITIAL = "initial";
+
+
+    public static final String POSITION_TIMESTAMP = "timeStamp";
+    public static final String POSITION_INC = "inc";
+    public static final String INITSYNC = "initSync";
+
+}
diff --git 
a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
 
b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
new file mode 100644
index 0000000..6b902e7
--- /dev/null
+++ 
b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java
@@ -0,0 +1,35 @@
+package org.apache.connect.mongo.replicator.event;
+
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+
+import java.util.Optional;
+
+import static org.apache.connect.mongo.replicator.Constants.*;
+
+
+public class DocumentConvertEvent {
+
+    public static ReplicationEvent convert(Document document) {
+        ReplicationEvent event = null;
+        try {
+
+            OperationType operationType = 
OperationType.getOperationType(document.getString(OPERATIONTYPE));
+            BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP);
+//                Long t = document.getLong("t");
+            Long h = document.getLong(HASH);
+            Integer v = document.getInteger(VERSION);
+            String nameSpace = document.getString(NAMESPACE);
+//                String uuid = document.getString("uuid");
+//                Date wall = document.getDate("wall");
+            Document operation = document.get(OPERATION, Document.class);
+            Document objectID = document.get(OBJECTID, Document.class);
+            event = new ReplicationEvent(operationType, timestamp, v, h, 
nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), 
document);
+        } catch (Exception e) {
+            System.out.println(e);
+        }
+
+        return event;
+    }
+
+}
diff --git 
a/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java 
b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
new file mode 100644
index 0000000..54df394
--- /dev/null
+++ b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java
@@ -0,0 +1,28 @@
+package org.apache.connect.mongo.replicator.event;
+
+public enum OperationType {
+
+    INSERT("i"),
+    UPDATE("u"),
+    DELETE("d"),
+    NOOP("n"),
+    DBCOMMAND("c"),
+    CREATED("created"),
+    UNKNOWN("unknown");
+
+    private final String operationStr;
+
+    OperationType(String operationStr) {
+        this.operationStr = operationStr;
+    }
+
+    public static OperationType getOperationType(String operationStr) {
+        for (OperationType operationType : OperationType.values()) {
+            if (operationType.operationStr.equals(operationStr)) {
+                return operationType;
+            }
+        }
+        return UNKNOWN;
+    }
+
+}
diff --git 
a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java 
b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
new file mode 100644
index 0000000..7719b3e
--- /dev/null
+++ 
b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java
@@ -0,0 +1,153 @@
+package org.apache.connect.mongo.replicator.event;
+
+import io.openmessaging.connector.api.data.EntryType;
+import org.apache.commons.lang3.StringUtils;
+import org.bson.BsonTimestamp;
+import org.bson.Document;
+
+import java.util.Optional;
+
+public class ReplicationEvent {
+
+    private Document document;
+    private OperationType operationType;
+    private Integer v;
+    private Long h;
+    private BsonTimestamp timestamp;
+    private String databaseName;
+    private String collectionName;
+    private String namespace;
+    private Optional<Document> eventData;
+    private Optional<Document> objectId;
+
+
+    public ReplicationEvent() {
+
+    }
+
+
+    public ReplicationEvent(OperationType operationType, BsonTimestamp 
timestamp, Integer v, Long h, String namespace, Optional<Document> eventData, 
Optional<Document> objectId, Document document) {
+        this.operationType = operationType;
+        this.v = v;
+        this.h = h;
+        this.timestamp = timestamp;
+        this.namespace = namespace;
+        this.eventData = eventData;
+        this.objectId = objectId;
+        String[] split = StringUtils.split(namespace, ".", 2);
+        this.databaseName = split != null && split.length == 2 ? split[0] : "";
+        this.collectionName = split != null && split.length == 2 ? split[1] : 
"";
+        this.document = document;
+    }
+
+
+    public OperationType getOperationType() {
+        return operationType;
+    }
+
+    public Integer getV() {
+        return v;
+    }
+
+    public Long getH() {
+        return h;
+    }
+
+    public BsonTimestamp getTimestamp() {
+        return timestamp;
+    }
+
+    public String getDatabaseName() {
+        return databaseName;
+    }
+
+    public String getCollectionName() {
+        return collectionName;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public Optional<Document> getEventData() {
+        return eventData;
+    }
+
+    public Optional<Document> getObjectId() {
+        return objectId;
+    }
+
+    public EntryType getEntryType() {
+        switch (operationType) {
+            case UPDATE:
+                return EntryType.UPDATE;
+            case DELETE:
+                return EntryType.DELETE;
+            case INSERT:
+                return EntryType.CREATE;
+            default:
+                return EntryType.CREATE;
+        }
+    }
+
+
+    public void setOperationType(OperationType operationType) {
+        this.operationType = operationType;
+    }
+
+
+    public Document getDocument() {
+        return document;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public void setDocument(Document document) {
+        this.document = document;
+    }
+
+    public void setV(Integer v) {
+        this.v = v;
+    }
+
+    public void setH(Long h) {
+        this.h = h;
+    }
+
+    public void setTimestamp(BsonTimestamp timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public void setDatabaseName(String databaseName) {
+        this.databaseName = databaseName;
+    }
+
+    public void setCollectionName(String collectionName) {
+        this.collectionName = collectionName;
+    }
+
+    public void setEventData(Optional<Document> eventData) {
+        this.eventData = eventData;
+    }
+
+    public void setObjectId(Optional<Document> objectId) {
+        this.objectId = objectId;
+    }
+
+    @Override
+    public String toString() {
+        return "ReplicationEvent{" +
+                "operationType=" + operationType +
+                ", v=" + v +
+                ", h=" + h +
+                ", timestamp=" + timestamp +
+                ", databaseName='" + databaseName + '\'' +
+                ", collectionName='" + collectionName + '\'' +
+                ", namespace='" + namespace + '\'' +
+                ", eventData=" + eventData.toString() +
+                ", objectId=" + objectId.toString() +
+                '}';
+    }
+}

Reply via email to