This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 32d316512cfb0e31d9255235695cbead65799891 Author: 李平 <[email protected]> AuthorDate: Thu Aug 8 16:18:41 2019 +0800 fix some bug --- .../connect/mongo/MongoReplicatorConfig.java | 45 +++++---- .../connect/mongo/connector/MongoSourceTask.java | 14 ++- .../apache/connect/mongo/initsync/InitSync.java | 38 +++----- .../apache/connect/mongo/replicator/Filter.java | 80 ++++++++-------- .../connect/mongo/replicator/MongoReplicator.java | 23 +++-- .../replicator/event/DocumentConvertEvent.java | 24 ++--- .../java/org/apache/connect/mongo/FilterTest.java | 42 +++++---- .../java/org/apache/connect/mongo/MongoTest.java | 105 +++++++++++++++++++++ .../org/apache/connect/mongo/ReplicatorTest.java | 33 ------- 9 files changed, 228 insertions(+), 176 deletions(-) diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java index 18a834f..9f17aab 100644 --- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java +++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java @@ -1,6 +1,7 @@ package org.apache.connect.mongo; import io.openmessaging.KeyValue; +import org.apache.commons.lang3.StringUtils; import java.lang.reflect.Method; import java.util.HashSet; @@ -8,12 +9,11 @@ import java.util.Set; public class MongoReplicatorConfig { + private String replicaSet; private String mongoAddr; - private int mongoPort; private String mongoUserName; private String mongoPassWord; - private String interestDB; - private String interestCollection; + private String interestDbAndCollection; private long positionTimeStamp; private int positionInc; private String dataSync; @@ -23,7 +23,6 @@ public class MongoReplicatorConfig { public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { { add("mongoAddr"); - add("mongoPort"); } }; @@ -51,20 +50,12 @@ public class MongoReplicatorConfig { this.positionTimeStamp = positionTimeStamp; } - public String getInterestDB() { - return interestDB; + public String getInterestDbAndCollection() { + return interestDbAndCollection; } - public void setInterestDB(String interestDB) { - this.interestDB = interestDB; - } - - public String getInterestCollection() { - return interestCollection; - } - - public void setInterestCollection(String interestCollection) { - this.interestCollection = interestCollection; + public void setInterestDbAndCollection(String interestDbAndCollection) { + this.interestDbAndCollection = interestDbAndCollection; } public String getMongoAddr() { @@ -75,13 +66,6 @@ public class MongoReplicatorConfig { this.mongoAddr = mongoAddr; } - public int getMongoPort() { - return mongoPort; - } - - public void setMongoPort(int mongoPort) { - this.mongoPort = mongoPort; - } public String getMongoUserName() { return mongoUserName; @@ -109,6 +93,14 @@ public class MongoReplicatorConfig { } + public String getReplicaSet() { + return replicaSet; + } + + public void setReplicaSet(String replicaSet) { + this.replicaSet = replicaSet; + } + public void load(KeyValue props) { properties2Object(props, this); @@ -154,4 +146,11 @@ public class MongoReplicatorConfig { } } } + + public String getDataSouce() { + if (StringUtils.isBlank(replicaSet)) { + return mongoAddr; + } + return replicaSet + ":" + mongoAddr; + } } diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java index 40dfdd0..9176ab7 100644 --- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java @@ -44,7 +44,7 @@ public class MongoSourceTask extends SourceTask { buildFieleds(schema); DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); dataEntryBuilder.timestamp(System.currentTimeMillis()) - .queue(event.getNamespace()) + .queue(event.getNamespace().replace(".", "-")) .entryType(event.getEntryType()); if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) { @@ -71,9 +71,7 @@ public class MongoSourceTask extends SourceTask { replicatorConfig = new MongoReplicatorConfig(); replicatorConfig.load(config); mongoReplicator = new MongoReplicator(replicatorConfig); - mongoSource = new StringBuilder() - .append(replicatorConfig.getMongoAddr()) - .append(replicatorConfig.getMongoPort()).toString(); + mongoSource = replicatorConfig.getDataSouce(); ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap( mongoSource.getBytes())); @@ -121,7 +119,7 @@ public class MongoSourceTask extends SourceTask { schema.getFields().add(operation); Field patch = new Field(5, Constants.PATCH, FieldType.STRING); schema.getFields().add(patch); - Field objectId = new Field(5, Constants.OBJECTID, FieldType.STRING); + Field objectId = new Field(6, Constants.OBJECTID, FieldType.STRING); schema.getFields().add(objectId); } @@ -134,9 +132,9 @@ public class MongoSourceTask extends SourceTask { jsonObject.put(Constants.INITSYNC, true); break; default: - jsonObject.put(Constants.POSITION_TIMESTAMP, 0); - jsonObject.put(Constants.POSITION_INC, 0); - jsonObject.put(Constants.INITSYNC, true); + jsonObject.put(Constants.POSITION_TIMESTAMP, event.getTimestamp().getTime()); + jsonObject.put(Constants.POSITION_INC, event.getTimestamp().getInc()); + jsonObject.put(Constants.INITSYNC, false); break; } return jsonObject; diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java index 4b3a238..e12412b 100644 --- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java +++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java @@ -27,7 +27,6 @@ public class InitSync { private MongoClient mongoClient; private Filter filter; private int copyThreadCount; - private Set<String> interestDataBases; private Set<CollectionMeta> interestCollections; private CountDownLatch countDownLatch; private MongoReplicator mongoReplicator; @@ -53,8 +52,7 @@ public class InitSync { } private void init() { - interestDataBases = getInterestDataBase(); - interestCollections = getInterestCollection(interestDataBases); + interestCollections = getInterestCollection(); copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread()); copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() { @@ -68,15 +66,17 @@ public class InitSync { countDownLatch = new CountDownLatch(interestCollections.size()); } - private Set<CollectionMeta> getInterestCollection(Set<String> interestDataBases) { + private Set<CollectionMeta> getInterestCollection() { Set<CollectionMeta> res = new HashSet<>(); - for (String interestDataBase : interestDataBases) { - MongoIterable<String> collectionNames = mongoClient.getDatabase(interestDataBase).listCollectionNames(); - MongoCursor<String> iterator = collectionNames.iterator(); - while (iterator.hasNext()) { - String collectionName = iterator.next(); - if (filter.filterCollectionName(collectionName)) { - CollectionMeta collectionMeta = new CollectionMeta(interestDataBase, collectionName); + MongoIterable<String> databaseNames = mongoClient.listDatabaseNames(); + MongoCursor<String> dbIterator = databaseNames.iterator(); + while (dbIterator.hasNext()) { + String dataBaseName = dbIterator.next(); + MongoCursor<String> collIterator = mongoClient.getDatabase(dataBaseName).listCollectionNames().iterator(); + while (collIterator.hasNext()) { + String collectionName = collIterator.next(); + CollectionMeta collectionMeta = new CollectionMeta(dataBaseName, collectionName); + if (filter.filter(collectionMeta)) { res.add(collectionMeta); } } @@ -86,19 +86,6 @@ public class InitSync { } - private Set<String> getInterestDataBase() { - Set<String> res = new HashSet<>(); - MongoIterable<String> databaseNames = mongoClient.listDatabaseNames(); - MongoCursor<String> iterator = databaseNames.iterator(); - while (iterator.hasNext()) { - String dataBaseName = iterator.next(); - if (filter.filterDatabaseName(dataBaseName)) { - res.add(dataBaseName); - } - } - - return res; - } class CopyRunner implements Runnable { @@ -132,6 +119,9 @@ public class InitSync { event.setNamespace(collectionMeta.getNameSpace()); mongoReplicator.publishEvent(event); } + + } catch (Exception e) { + logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace()); } finally { countDownLatch.countDown(); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java index 05dec54..4a62e41 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java +++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java @@ -1,74 +1,68 @@ package org.apache.connect.mongo.replicator; -import org.apache.commons.lang3.ArrayUtils; +import com.alibaba.fastjson.JSONObject; +import com.alibaba.fastjson.TypeReference; import org.apache.commons.lang3.StringUtils; import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.initsync.CollectionMeta; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.function.Function; public class Filter { - private Function<String, Boolean> dbFilter; - private Function<String, Boolean> collectionFilter; - private Function<OperationType, Boolean> noopFilter; + private Function<CollectionMeta, Boolean> dbAndCollectionFilter; + private Map<String, List<String>> interestMap = new HashMap<>(); + private Function<OperationType, Boolean> notNoopFilter; public Filter(MongoReplicatorConfig mongoReplicatorConfig) { - if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestDB())) { - dbFilter = (dataBaseName) -> { - if (StringUtils.isBlank(dataBaseName)) { - return true; - } - String interestDB = mongoReplicatorConfig.getInterestDB(); - String[] db = StringUtils.split(interestDB, ","); - if (ArrayUtils.contains(db, dataBaseName)) { - return true; - } - return false; - }; - } else { - dbFilter = (dataBaseName) -> true; - } + String interestDbAndCollection = mongoReplicatorConfig.getInterestDbAndCollection(); + if (StringUtils.isNotBlank(interestDbAndCollection)) { + JSONObject jsonObject = JSONObject.parseObject(interestDbAndCollection); + for (String db : jsonObject.keySet()) { + List<String> collections = jsonObject.getObject(db, new TypeReference<List<String>>() { + }); + interestMap.put(db, collections); + } - if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestCollection())) { - collectionFilter = (collectionName) -> { - if (StringUtils.isBlank(collectionName)) { - return true; - } - String interestCollection = mongoReplicatorConfig.getInterestCollection(); - String[] coll = StringUtils.split(interestCollection, ","); - if (ArrayUtils.contains(coll, collectionName)) { - return true; - } - return false; - }; - } else { - collectionFilter = (collectionName) -> true; } - noopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal(); - } + dbAndCollectionFilter = (collectionMeta) -> { + if (interestMap.size() == 0) { + return true; + } + List<String> collections = interestMap.get(collectionMeta.getDatabaseName()); + if (collections == null || collections.size() == 0) { + return false; + } + + if (collections.contains("*") || collections.contains(collectionMeta.getCollectionName())) { + return true; + } - public boolean filterDatabaseName(String dataBaseName) { - return dbFilter.apply(dataBaseName); + return false; + }; + + notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal(); } - public boolean filterCollectionName(String collectionName) { - return collectionFilter.apply(collectionName); + public boolean filter(CollectionMeta collectionMeta) { + return dbAndCollectionFilter.apply(collectionMeta); } public boolean filterEvent(ReplicationEvent event) { - return dbFilter.apply(event.getDatabaseName()) - && collectionFilter.apply(event.getCollectionName()) - && noopFilter.apply(event.getOperationType()); + return dbAndCollectionFilter.apply(new CollectionMeta(event.getDatabaseName(), event.getCollectionName())) + && notNoopFilter.apply(event.getOperationType()); } - } diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java index a14ceee..0f8520b 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java +++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java @@ -54,7 +54,8 @@ public class MongoReplicator { return; } - this.clientSettings = MongoClientSettings.builder().applicationName(APPLICATION_NAME) + this.clientSettings = MongoClientSettings.builder() + .applicationName(APPLICATION_NAME) .applyConnectionString(connectionString) .build(); this.mongoClient = MongoClients.create(clientSettings); @@ -68,7 +69,6 @@ public class MongoReplicator { private void buildConnectionString() { - checkConfig(); StringBuilder sb = new StringBuilder(); sb.append("mongodb://"); if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName()) @@ -80,19 +80,17 @@ public class MongoReplicator { } sb.append(mongoReplicatorConfig.getMongoAddr()); - sb.append(":"); - sb.append(mongoReplicatorConfig.getMongoPort()); - + sb.append("/"); + if (StringUtils.isBlank(mongoReplicatorConfig.getReplicaSet())) { + sb.append("?"); + sb.append("replicaSet="); + sb.append(mongoReplicatorConfig.getReplicaSet()); + } this.connectionString = new ConnectionString(sb.toString()); } - private void checkConfig() { - Validate.notBlank(mongoReplicatorConfig.getMongoAddr(), "mongo url is blank"); - Validate.isTrue(mongoReplicatorConfig.getMongoPort() > 0 && mongoReplicatorConfig.getMongoPort() < 65535, "mongo port should >0 and <65535"); - } - - private boolean isReplicaMongo() { + public boolean isReplicaMongo() { MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE); MongoIterable<String> collectionNames = local.listCollectionNames(); for (String collectionName : collectionNames) { @@ -101,7 +99,7 @@ public class MongoReplicator { } } this.shutdown(); - throw new IllegalStateException(String.format("url:%s, port:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getMongoPort())); + throw new IllegalStateException(String.format("url:%s, set:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getReplicaSet())); } public void shutdown() { @@ -127,6 +125,7 @@ public class MongoReplicator { } + public void pause() { pause = true; } diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java index 6b902e7..a18aa52 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java +++ b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java @@ -10,26 +10,20 @@ import static org.apache.connect.mongo.replicator.Constants.*; public class DocumentConvertEvent { + public static ReplicationEvent convert(Document document) { - ReplicationEvent event = null; - try { - OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE)); - BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP); + OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE)); + BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP); // Long t = document.getLong("t"); - Long h = document.getLong(HASH); - Integer v = document.getInteger(VERSION); - String nameSpace = document.getString(NAMESPACE); + Long h = document.getLong(HASH); + Integer v = document.getInteger(VERSION); + String nameSpace = document.getString(NAMESPACE); // String uuid = document.getString("uuid"); // Date wall = document.getDate("wall"); - Document operation = document.get(OPERATION, Document.class); - Document objectID = document.get(OBJECTID, Document.class); - event = new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document); - } catch (Exception e) { - System.out.println(e); - } - - return event; + Document operation = document.get(OPERATION, Document.class); + Document objectID = document.get(OBJECTID, Document.class); + return new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document); } } diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java index 24f36d9..2b14b13 100644 --- a/src/test/java/org/apache/connect/mongo/FilterTest.java +++ b/src/test/java/org/apache/connect/mongo/FilterTest.java @@ -1,5 +1,7 @@ package org.apache.connect.mongo; +import com.alibaba.fastjson.JSONObject; +import org.apache.connect.mongo.initsync.CollectionMeta; import org.apache.connect.mongo.replicator.Filter; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; @@ -7,51 +9,55 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class FilterTest { private MongoReplicatorConfig config; - + private Map<String, List<String>> insterest; @Before public void init() { config = new MongoReplicatorConfig(); - + insterest = new HashMap<>(); } @Test public void testSpecialDb() { - config.setInterestDB("test,admin"); + List<String> collections = new ArrayList<>(); + collections.add("person"); + insterest.put("test", collections); + config.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); Filter filter = new Filter(config); - Assert.assertTrue(filter.filterDatabaseName("test")); - Assert.assertFalse(filter.filterDatabaseName("test01")); + Assert.assertTrue(filter.filter(new CollectionMeta("test", "person"))); + Assert.assertFalse(filter.filter(new CollectionMeta("test", "person01"))); } @Test public void testBlankDb() { Filter filter = new Filter(config); - Assert.assertTrue(filter.filterDatabaseName("test")); - Assert.assertTrue(filter.filterDatabaseName("test01")); + Assert.assertTrue(filter.filter(new CollectionMeta("test" ,"test"))); + Assert.assertTrue(filter.filter(new CollectionMeta("test1" ,"test01"))); } @Test - public void testSpecialCollection() { - config.setInterestCollection("test,admin"); + public void testAsterisk() { + List<String> collections = new ArrayList<>(); + collections.add("*"); + insterest.put("test", collections); + config.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); Filter filter = new Filter(config); - Assert.assertTrue(filter.filterCollectionName("test")); - Assert.assertFalse(filter.filterCollectionName("test01")); + Assert.assertTrue(filter.filter(new CollectionMeta("test", "testsad"))); + Assert.assertTrue(filter.filter(new CollectionMeta("test", "tests032"))); } - @Test - public void testBlankCollection() { - Filter filter = new Filter(config); - Assert.assertTrue(filter.filterCollectionName("test")); - Assert.assertTrue(filter.filterCollectionName("test01")); - } - @Test public void testFilterEvent() { diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java new file mode 100644 index 0000000..849c00c --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/MongoTest.java @@ -0,0 +1,105 @@ +package org.apache.connect.mongo; + +import com.alibaba.fastjson.JSONObject; +import com.mongodb.ConnectionString; +import com.mongodb.CursorType; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.*; +import com.mongodb.client.model.Filters; +import io.openmessaging.connector.api.data.EntryType; +import org.apache.connect.mongo.initsync.InitSync; +import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.replicator.Filter; +import org.apache.connect.mongo.replicator.MongoReplicator; +import org.apache.connect.mongo.replicator.event.DocumentConvertEvent; +import org.apache.connect.mongo.replicator.event.OperationType; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.bson.BsonTimestamp; +import org.bson.Document; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class MongoTest { + + private MongoClient mongoClient; + + @Before + public void before() { + MongoClientSettings.Builder builder = MongoClientSettings.builder(); + builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077")); + mongoClient = MongoClients.create(builder.build()); + } + + + @Test + public void testConvertEvent() { + Document oplog = new Document(); + BsonTimestamp timestamp = new BsonTimestamp(1565074665, 10); + oplog.put(Constants.TIMESTAMP, timestamp); + oplog.put(Constants.NAMESPACE, "test.person"); + oplog.put(Constants.HASH, 11111L); + oplog.put(Constants.OPERATIONTYPE, "i"); + Document document = new Document(); + document.put("test", "test"); + oplog.put(Constants.OPERATION, document); + ReplicationEvent event = DocumentConvertEvent.convert(oplog); + Assert.assertEquals(timestamp, event.getTimestamp()); + Assert.assertEquals("test.person", event.getNamespace()); + Assert.assertTrue(11111L == event.getH()); + Assert.assertEquals(OperationType.INSERT, event.getOperationType()); + Assert.assertEquals(EntryType.CREATE, event.getEntryType()); + Assert.assertEquals(document, event.getEventData().get()); + + + } + + + @Test + public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException { + MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person"); + collection.deleteMany(new Document()); + int count = 100; + List<Document> documents = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + Document document = new Document(); + document.put("name", "test" + i); + document.put("age", i); + document.put("sex", i % 2 == 0 ? "boy" : "girl"); + collection.insertOne(document); + documents.add(document); + } + MongoReplicatorConfig config = new MongoReplicatorConfig(); + Map<String, List<String>> insterest = new HashMap<>(); + List<String> collections = new ArrayList<>(); + collections.add("*"); + insterest.put("test", collections); + config.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); + MongoReplicator mongoReplicator = new MongoReplicator(config); + Field running = MongoReplicator.class.getDeclaredField("running"); + running.setAccessible(true); + running.set(mongoReplicator, new AtomicBoolean(true)); + InitSync initSync = new InitSync(config, mongoClient, new Filter(config), mongoReplicator); + initSync.start(); + BlockingQueue<ReplicationEvent> queue = mongoReplicator.getQueue(); + while (count > 0) { + count--; + ReplicationEvent event = queue.poll(100, TimeUnit.MILLISECONDS); + Assert.assertTrue(event.getOperationType().equals(OperationType.CREATED)); + Assert.assertNotNull(event.getDocument()); + Assert.assertTrue(documents.contains(event.getDocument())); + } + + + + } +} diff --git a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java deleted file mode 100644 index b0319f1..0000000 --- a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java +++ /dev/null @@ -1,33 +0,0 @@ -package org.apache.connect.mongo; - -import org.apache.connect.mongo.replicator.MongoReplicator; -import org.junit.Before; -import org.junit.Test; - -public class ReplicatorTest { - - private MongoReplicatorConfig config; - - @Before - public void before() { - config = new MongoReplicatorConfig(); - } - - @Test(expected = IllegalArgumentException.class) - public void testNoPort() { - config.setMongoAddr("127.0.0.1"); - MongoReplicator mongoReplicator = new MongoReplicator(config); - mongoReplicator.start(); - } - - - @Test - public void testNoPort1() { - config.setMongoAddr("127.0.0.1"); - config.setMongoPort(27012); - MongoReplicator mongoReplicator = new MongoReplicator(config); - mongoReplicator.start(); - } - - -}
