This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 70caed3673b4d5b783447622410ddf3dcf9bdfee Author: 李平 <[email protected]> AuthorDate: Thu Aug 8 16:25:52 2019 +0800 fix some bug --- .../org/apache/connect/mongo/MongoReplicatorConfig.java | 13 ++++++++++--- .../org/apache/connect/mongo/replicator/ReplicatorTask.java | 4 ++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java index 9f17aab..b7044ec 100644 --- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java +++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java @@ -2,6 +2,7 @@ package org.apache.connect.mongo; import io.openmessaging.KeyValue; import org.apache.commons.lang3.StringUtils; +import org.bson.BsonTimestamp; import java.lang.reflect.Method; import java.util.HashSet; @@ -14,7 +15,7 @@ public class MongoReplicatorConfig { private String mongoUserName; private String mongoPassWord; private String interestDbAndCollection; - private long positionTimeStamp; + private int positionTimeStamp; private int positionInc; private String dataSync; private int copyThread = Runtime.getRuntime().availableProcessors(); @@ -42,11 +43,11 @@ public class MongoReplicatorConfig { this.copyThread = copyThread; } - public long getPositionTimeStamp() { + public int getPositionTimeStamp() { return positionTimeStamp; } - public void setPositionTimeStamp(long positionTimeStamp) { + public void setPositionTimeStamp(int positionTimeStamp) { this.positionTimeStamp = positionTimeStamp; } @@ -153,4 +154,10 @@ public class MongoReplicatorConfig { } return replicaSet + ":" + mongoAddr; } + + + public BsonTimestamp getPosition() { + return new BsonTimestamp(positionTimeStamp, positionInc); + } + } diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java index 9e4c67b..32bb25b 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java @@ -7,9 +7,9 @@ import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.initsync.InitSync; import org.apache.connect.mongo.replicator.event.DocumentConvertEvent; import org.apache.connect.mongo.replicator.event.ReplicationEvent; -import org.apache.connect.mongo.initsync.InitSync; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +46,7 @@ public class ReplicatorTask implements Runnable { FindIterable<Document> iterable; if (mongoReplicatorConfig.getPositionTimeStamp() > 0 && mongoReplicatorConfig.getPositionTimeStamp() < System.currentTimeMillis()) { iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find( - Filters.gt("ts", mongoReplicatorConfig.getPositionTimeStamp())); + Filters.gt("ts", mongoReplicatorConfig.getPosition())); } else { iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); }
