This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 6777f2317f083112cadd652270fd141290122f1b Author: 李平 <[email protected]> AuthorDate: Thu Aug 8 19:53:39 2019 +0800 fix bug --- .../org/apache/connect/mongo/MongoReplicatorConfig.java | 6 +++--- .../apache/connect/mongo/connector/MongoSourceTask.java | 11 ++++++----- .../apache/connect/mongo/replicator/MongoReplicator.java | 15 ++++++++++----- .../apache/connect/mongo/replicator/ReplicatorTask.java | 2 +- src/test/java/org/apache/connect/mongo/MongoTest.java | 10 +++++----- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java index b7044ec..9097640 100644 --- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java +++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java @@ -17,7 +17,7 @@ public class MongoReplicatorConfig { private String interestDbAndCollection; private int positionTimeStamp; private int positionInc; - private String dataSync; + private boolean dataSync; private int copyThread = Runtime.getRuntime().availableProcessors(); @@ -85,11 +85,11 @@ public class MongoReplicatorConfig { } - public String getDataSync() { + public boolean getDataSync() { return dataSync; } - public void setDataSync(String dataSync) { + public void setDataSync(boolean dataSync) { this.dataSync = dataSync; } diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java index 9176ab7..7272878 100644 --- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java @@ -4,11 +4,11 @@ import com.alibaba.fastjson.JSONObject; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.data.*; import io.openmessaging.connector.api.source.SourceTask; -import org.apache.connect.mongo.replicator.Constants; import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.replicator.MongoReplicator; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; -import org.apache.connect.mongo.replicator.MongoReplicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +44,7 @@ public class MongoSourceTask extends SourceTask { buildFieleds(schema); DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); dataEntryBuilder.timestamp(System.currentTimeMillis()) - .queue(event.getNamespace().replace(".", "-")) + .queue(event.getNamespace().replace(".", "-").replace("$", "")) .entryType(event.getEntryType()); if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) { @@ -78,10 +78,11 @@ public class MongoSourceTask extends SourceTask { if (position != null && position.array().length > 0) { String positionJson = new String(position.array(), StandardCharsets.UTF_8); JSONObject jsonObject = JSONObject.parseObject(positionJson); - replicatorConfig.setPositionTimeStamp(jsonObject.getLongValue("timeStamp")); + replicatorConfig.setPositionTimeStamp(jsonObject.getIntValue("timeStamp")); replicatorConfig.setPositionInc(jsonObject.getIntValue("inc")); + replicatorConfig.setDataSync(jsonObject.getBooleanValue(Constants.INITSYNC)); } else { - replicatorConfig.setDataSync(Constants.INITIAL); + replicatorConfig.setDataSync(true); } mongoReplicator.start(); }catch (Throwable throwable) { diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java index 0f8520b..60b8d3d 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java +++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java @@ -1,16 +1,21 @@ package org.apache.connect.mongo.replicator; -import com.mongodb.*; -import com.mongodb.client.*; +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.Validate; import org.apache.connect.mongo.MongoReplicatorConfig; import org.apache.connect.mongo.replicator.event.ReplicationEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.connect.mongo.replicator.Constants.*; @@ -81,7 +86,7 @@ public class MongoReplicator { } sb.append(mongoReplicatorConfig.getMongoAddr()); sb.append("/"); - if (StringUtils.isBlank(mongoReplicatorConfig.getReplicaSet())) { + if (StringUtils.isNotBlank(mongoReplicatorConfig.getReplicaSet())) { sb.append("?"); sb.append("replicaSet="); sb.append(mongoReplicatorConfig.getReplicaSet()); diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java index 32bb25b..766225f 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java @@ -37,7 +37,7 @@ public class ReplicatorTask implements Runnable { @Override public void run() { - if (Constants.INITIAL.equals(mongoReplicatorConfig.getDataSync())) { + if (mongoReplicatorConfig.getDataSync()) { InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator); initSync.start(); } diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java index 849c00c..cc83fbe 100644 --- a/src/test/java/org/apache/connect/mongo/MongoTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoTest.java @@ -2,10 +2,10 @@ package org.apache.connect.mongo; import com.alibaba.fastjson.JSONObject; import com.mongodb.ConnectionString; -import com.mongodb.CursorType; import com.mongodb.MongoClientSettings; -import com.mongodb.client.*; -import com.mongodb.client.model.Filters; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; import io.openmessaging.connector.api.data.EntryType; import org.apache.connect.mongo.initsync.InitSync; import org.apache.connect.mongo.replicator.Constants; @@ -36,7 +36,7 @@ public class MongoTest { @Before public void before() { MongoClientSettings.Builder builder = MongoClientSettings.builder(); - builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077")); + builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27018")); mongoClient = MongoClients.create(builder.build()); } @@ -68,7 +68,7 @@ public class MongoTest { public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException { MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person"); collection.deleteMany(new Document()); - int count = 100; + int count = 1; List<Document> documents = new ArrayList<>(count); for (int i = 0; i < count; i++) { Document document = new Document();
