This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 6ca400a1e37743bf0a701e49bd5b3b28efeba9b7 Author: 李平 <[email protected]> AuthorDate: Mon Aug 5 15:02:53 2019 +0800 add junit test and modify some code --- .../apache/connect/mongo/replicator/Constants.java | 33 +++++ .../replicator/event/DocumentConvertEvent.java | 35 +++++ .../mongo/replicator/event/OperationType.java | 28 ++++ .../mongo/replicator/event/ReplicationEvent.java | 153 +++++++++++++++++++++ 4 files changed, 249 insertions(+) diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java new file mode 100644 index 0000000..668fd91 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java @@ -0,0 +1,33 @@ +package org.apache.connect.mongo.replicator; + +public class Constants { + + + public static final String APPLICATION_NAME = "java-mongo-replicator"; + + + public static final String MONGO_LOCAL_DATABASE = "local"; + public static final String MONGO_OPLOG_RS = "oplog.rs"; + + + + public static final String OPERATIONTYPE = "op"; + public static final String TIMESTAMP = "ts"; + public static final String VERSION = "v"; + public static final String HASH = "h"; + public static final String NAMESPACE = "ns"; + public static final String OPERATION = "o"; + public static final String OBJECTID = "o2"; + + + public static final String CREATED = "created"; + public static final String PATCH = "patch"; + + public static final String INITIAL = "initial"; + + + public static final String POSITION_TIMESTAMP = "timeStamp"; + public static final String POSITION_INC = "inc"; + public static final String INITSYNC = "initSync"; + +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java new file mode 100644 index 0000000..6b902e7 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java @@ -0,0 +1,35 @@ +package org.apache.connect.mongo.replicator.event; + +import org.bson.BsonTimestamp; +import org.bson.Document; + +import java.util.Optional; + +import static org.apache.connect.mongo.replicator.Constants.*; + + +public class DocumentConvertEvent { + + public static ReplicationEvent convert(Document document) { + ReplicationEvent event = null; + try { + + OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE)); + BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP); +// Long t = document.getLong("t"); + Long h = document.getLong(HASH); + Integer v = document.getInteger(VERSION); + String nameSpace = document.getString(NAMESPACE); +// String uuid = document.getString("uuid"); +// Date wall = document.getDate("wall"); + Document operation = document.get(OPERATION, Document.class); + Document objectID = document.get(OBJECTID, Document.class); + event = new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document); + } catch (Exception e) { + System.out.println(e); + } + + return event; + } + +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java new file mode 100644 index 0000000..54df394 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java @@ -0,0 +1,28 @@ +package org.apache.connect.mongo.replicator.event; + +public enum OperationType { + + INSERT("i"), + UPDATE("u"), + DELETE("d"), + NOOP("n"), + DBCOMMAND("c"), + CREATED("created"), + UNKNOWN("unknown"); + + private final String operationStr; + + OperationType(String operationStr) { + this.operationStr = operationStr; + } + + public static OperationType getOperationType(String operationStr) { + for (OperationType operationType : OperationType.values()) { + if (operationType.operationStr.equals(operationStr)) { + return operationType; + } + } + return UNKNOWN; + } + +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java new file mode 100644 index 0000000..7719b3e --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java @@ -0,0 +1,153 @@ +package org.apache.connect.mongo.replicator.event; + +import io.openmessaging.connector.api.data.EntryType; +import org.apache.commons.lang3.StringUtils; +import org.bson.BsonTimestamp; +import org.bson.Document; + +import java.util.Optional; + +public class ReplicationEvent { + + private Document document; + private OperationType operationType; + private Integer v; + private Long h; + private BsonTimestamp timestamp; + private String databaseName; + private String collectionName; + private String namespace; + private Optional<Document> eventData; + private Optional<Document> objectId; + + + public ReplicationEvent() { + + } + + + public ReplicationEvent(OperationType operationType, BsonTimestamp timestamp, Integer v, Long h, String namespace, Optional<Document> eventData, Optional<Document> objectId, Document document) { + this.operationType = operationType; + this.v = v; + this.h = h; + this.timestamp = timestamp; + this.namespace = namespace; + this.eventData = eventData; + this.objectId = objectId; + String[] split = StringUtils.split(namespace, ".", 2); + this.databaseName = split != null && split.length == 2 ? split[0] : ""; + this.collectionName = split != null && split.length == 2 ? split[1] : ""; + this.document = document; + } + + + public OperationType getOperationType() { + return operationType; + } + + public Integer getV() { + return v; + } + + public Long getH() { + return h; + } + + public BsonTimestamp getTimestamp() { + return timestamp; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getCollectionName() { + return collectionName; + } + + public String getNamespace() { + return namespace; + } + + public Optional<Document> getEventData() { + return eventData; + } + + public Optional<Document> getObjectId() { + return objectId; + } + + public EntryType getEntryType() { + switch (operationType) { + case UPDATE: + return EntryType.UPDATE; + case DELETE: + return EntryType.DELETE; + case INSERT: + return EntryType.CREATE; + default: + return EntryType.CREATE; + } + } + + + public void setOperationType(OperationType operationType) { + this.operationType = operationType; + } + + + public Document getDocument() { + return document; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public void setDocument(Document document) { + this.document = document; + } + + public void setV(Integer v) { + this.v = v; + } + + public void setH(Long h) { + this.h = h; + } + + public void setTimestamp(BsonTimestamp timestamp) { + this.timestamp = timestamp; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public void setEventData(Optional<Document> eventData) { + this.eventData = eventData; + } + + public void setObjectId(Optional<Document> objectId) { + this.objectId = objectId; + } + + @Override + public String toString() { + return "ReplicationEvent{" + + "operationType=" + operationType + + ", v=" + v + + ", h=" + h + + ", timestamp=" + timestamp + + ", databaseName='" + databaseName + '\'' + + ", collectionName='" + collectionName + '\'' + + ", namespace='" + namespace + '\'' + + ", eventData=" + eventData.toString() + + ", objectId=" + objectId.toString() + + '}'; + } +}
