This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit db51f7ff808a10209ff7eafb68cec0e08f25f058 Author: 李平 <[email protected]> AuthorDate: Mon Aug 26 14:20:51 2019 +0800 clean code --- README.md | 17 +++++++-- .../apache/connect/mongo/initsync/InitSync.java | 3 +- .../{Filter.java => OperationFilter.java} | 4 +- .../mongo/replicator/ReplicaSetManager.java | 2 +- .../mongo/replicator/ReplicaSetsContext.java | 10 ++--- .../connect/mongo/replicator/ReplicatorTask.java | 25 ++++++------ .../java/org/apache/connect/mongo/FilterTest.java | 26 ++++++------- .../org/apache/connect/mongo/MongoFactoryTest.java | 44 ++++++++++++---------- 8 files changed, 73 insertions(+), 58 deletions(-) diff --git a/README.md b/README.md index e32c455..bec30a3 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,18 @@ this is source connector moudle for mongo,you can run this by running rocketmq connecotr api, -some junit rely on mongo database you can start with a docker container +some junit rely on mongo database you can flow step run a mongo container -`docker run -p27027:27017 --name mongo-test -d mongo:4.0.10 --replSet "repl1"` +- `docker run -p27027:27017 --name mongo-test -d mongo:4.0.10 --replSet "repl1"` +- `docker exec -it mongo-test mongo ` +- `rs.initiate()` -and then init a mongo replicaSet +init a mongo replicaSet run all junit test -`docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test +## a special junit +method `MongoFactoryTest#testSSLTrustStore` is for mongo ssl or tsl test,need mongod config ssl mode, if you want use ssl or tsl you need +modify junit , appoint ssl or tsl pem path and password。 ## task config params @@ -32,3 +36,8 @@ and then init a mongo replicaSet | zlibCompressionLevel | zlib compressors level| int (1-7)| | trustStore | ssl pem| path| | trustStorePassword | ssl pem decrypt password | string| + + +## use case + +`http://127.0.0.1:8081/connectors/testMongoReplicaSet?config={"connector-class":"org.apache.connect.mongo.connector.MongoSourceConnector","oms-driver-url":"oms:rocketmq://localhost:9876/default:default","mongoAddr":"rep1/127.0.0.1:27077,127.0.0.1:27078,127.0.0.1:27080","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}` diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java index 3d68fac..a51b727 100644 --- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java +++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java @@ -65,7 +65,8 @@ public class InitSync { } try { countDownLatch.await(); - } catch (Exception e) { + } catch (InterruptedException e) { + logger.error("init sync wait countDownLatch interrupted"); } finally { copyExecutor.shutdown(); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java similarity index 96% rename from src/main/java/org/apache/connect/mongo/replicator/Filter.java rename to src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java index a517822..a173f6c 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java +++ b/src/main/java/org/apache/connect/mongo/replicator/OperationFilter.java @@ -29,13 +29,13 @@ import org.apache.connect.mongo.initsync.CollectionMeta; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; -public class Filter { +public class OperationFilter { private Function<CollectionMeta, Boolean> dbAndCollectionFilter; private Map<String, List<String>> interestMap = new HashMap<>(); private Function<OperationType, Boolean> notNoopFilter; - public Filter(SourceTaskConfig sourceTaskConfig) { + public OperationFilter(SourceTaskConfig sourceTaskConfig) { String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection(); diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java index c5757d8..88097d8 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java @@ -75,7 +75,7 @@ public class ReplicaSetManager { } private void validate() { - Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAdd need special replicaSet addr"); + Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAddr need special replicaSet addr"); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java index b067256..8dd85d7 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java @@ -42,18 +42,16 @@ public class ReplicaSetsContext { private AtomicBoolean initSyncAbort = new AtomicBoolean(); - private Filter filter; + private OperationFilter operationFilter; private MongoClientFactory mongoClientFactory; - private Map<String, Position> lastPositionMap; public ReplicaSetsContext(SourceTaskConfig taskConfig) { this.taskConfig = taskConfig; this.replicaSets = new ArrayList<>(); - this.lastPositionMap = new HashMap<>(); this.dataEntryQueue = new LinkedBlockingDeque<>(); - this.filter = new Filter(taskConfig); + this.operationFilter = new OperationFilter(taskConfig); this.mongoClientFactory = new MongoClientFactory(taskConfig); } @@ -62,11 +60,11 @@ public class ReplicaSetsContext { } public boolean filterEvent(ReplicationEvent event) { - return filter.filterEvent(event); + return operationFilter.filterEvent(event); } public boolean filterMeta(CollectionMeta collectionMeta) { - return filter.filterMeta(collectionMeta); + return operationFilter.filterMeta(collectionMeta); } public int getCopyThread() { diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java index 4c142ce..cd78f24 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java @@ -54,20 +54,23 @@ public class ReplicatorTask implements Runnable { @Override public void run() { - BsonTimestamp firstAvailablePosition = findOplogFirstPosition(); - - // inValid or - // user config dataSync or - // user config or runtime saved position lt first oplog position maybe some operation is lost so need dataSync - if (!replicaSetConfig.getPosition().isValid() || replicaSetConfig.getPosition().isInitSync() - || replicaSetConfig.getPosition().converBsonTimeStamp().compareTo(firstAvailablePosition) < 0) { - recordOplogLastPosition(); + BsonTimestamp firstAvailablePosition = findFirstOplogPosition(); + + Position userConfigOrRuntimePosition = replicaSetConfig.getPosition(); + + boolean needDataSync = !userConfigOrRuntimePosition.isValid() + || userConfigOrRuntimePosition.isInitSync() + // userConfigOrRuntimePosition.position < firstAvailablePosition maybe lost some operations + || userConfigOrRuntimePosition.converBsonTimeStamp().compareTo(firstAvailablePosition) < 0; + + if (needDataSync) { + recordLastOplogPosition(); InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet); initSync.start(); } - if (!replicaSet.isRuning() || !replicaSetsContext.isInitSyncAbort()) { + if (!replicaSet.isRuning() || replicaSetsContext.isInitSyncAbort()) { return; } @@ -96,7 +99,7 @@ public class ReplicatorTask implements Runnable { logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig); } - private BsonTimestamp findOplogFirstPosition() { + private BsonTimestamp findFirstOplogPosition() { MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE); FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); Document lastOplog = iterable.sort(new Document("$natural", 1)).limit(1).first(); @@ -104,7 +107,7 @@ public class ReplicatorTask implements Runnable { return timestamp; } - private void recordOplogLastPosition() { + private void recordLastOplogPosition() { MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE); FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); Document lastOplog = iterable.sort(new Document("$natural", -1)).limit(1).first(); diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java index 31c8f4d..d5deefd 100644 --- a/src/test/java/org/apache/connect/mongo/FilterTest.java +++ b/src/test/java/org/apache/connect/mongo/FilterTest.java @@ -6,7 +6,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.connect.mongo.initsync.CollectionMeta; -import org.apache.connect.mongo.replicator.Filter; +import org.apache.connect.mongo.replicator.OperationFilter; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; import org.junit.Assert; @@ -30,16 +30,16 @@ public class FilterTest { collections.add("person"); insterest.put("test", collections); sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); - Filter filter = new Filter(sourceTaskConfig); - Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "person"))); - Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01"))); + OperationFilter operationFilter = new OperationFilter(sourceTaskConfig); + Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "person"))); + Assert.assertFalse(operationFilter.filterMeta(new CollectionMeta("test", "person01"))); } @Test public void testBlankDb() { - Filter filter = new Filter(sourceTaskConfig); - Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "test"))); - Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01"))); + OperationFilter operationFilter = new OperationFilter(sourceTaskConfig); + Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "test"))); + Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test1", "test01"))); } @Test @@ -48,19 +48,19 @@ public class FilterTest { collections.add("*"); insterest.put("test", collections); sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); - Filter filter = new Filter(sourceTaskConfig); - Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "testsad"))); - Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032"))); + OperationFilter operationFilter = new OperationFilter(sourceTaskConfig); + Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "testsad"))); + Assert.assertTrue(operationFilter.filterMeta(new CollectionMeta("test", "tests032"))); } @Test public void testFilterEvent() { - Filter filter = new Filter(sourceTaskConfig); + OperationFilter operationFilter = new OperationFilter(sourceTaskConfig); ReplicationEvent replicationEvent = new ReplicationEvent(); replicationEvent.setOperationType(OperationType.NOOP); - Assert.assertFalse(filter.filterEvent(replicationEvent)); + Assert.assertFalse(operationFilter.filterEvent(replicationEvent)); replicationEvent.setOperationType(OperationType.DB_COMMAND); - Assert.assertTrue(filter.filterEvent(replicationEvent)); + Assert.assertTrue(operationFilter.filterEvent(replicationEvent)); } diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java index 0f02064..e47d2c4 100644 --- a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java @@ -2,12 +2,16 @@ package org.apache.connect.mongo; import com.mongodb.MongoClientSettings; import com.mongodb.MongoTimeoutException; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoCursor; import com.mongodb.client.internal.MongoClientImpl; import java.lang.reflect.Field; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.connect.mongo.replicator.MongoClientFactory; import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.bson.Document; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -149,25 +153,25 @@ public class MongoFactoryTest { return null; } -// @Test -// public void testSSLTrustStore() { -// sourceTaskConfig.setMongoUserName("user_test"); -// sourceTaskConfig.setMongoPassWord("pwd_test"); -// sourceTaskConfig.setSsl("ssl"); -// sourceTaskConfig.setSslInvalidHostNameAllowed("true"); -// sourceTaskConfig.setTrustStore("/Users/liping/test.pem"); -// sourceTaskConfig.setTrustStorePassword("test001"); -// sourceTaskConfig.setServerSelectionTimeoutMS("10000"); -// MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig); -// MongoCollection<Document> collection = client.getDatabase("test").getCollection("person"); -// Document document = new Document(); -// document.put("name", "liping"); -// collection.insertOne(document); -// MongoCursor<Document> iterator = collection.find().iterator(); -// while (iterator.hasNext()) { -// System.out.println(iterator.next()); -// } -// -// } + @Test + public void testSSLTrustStore() { + sourceTaskConfig.setMongoUserName("user_test"); + sourceTaskConfig.setMongoPassWord("pwd_test"); + sourceTaskConfig.setSsl(true); + sourceTaskConfig.setSslInvalidHostNameAllowed(true); + sourceTaskConfig.setTrustStore("/Users/home/test.pem"); + sourceTaskConfig.setTrustStorePassword("test001"); + sourceTaskConfig.setServerSelectionTimeoutMS(10000); + MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig); + MongoCollection<Document> collection = client.getDatabase("test").getCollection("person"); + Document document = new Document(); + document.put("name", "test"); + collection.insertOne(document); + MongoCursor<Document> iterator = collection.find().iterator(); + while (iterator.hasNext()) { + System.out.println(iterator.next()); + } + + } }
