This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit f193fcdc1c654837d9dfee60ea4b2e56afbd6d5d Author: 李平 <[email protected]> AuthorDate: Thu Aug 15 15:22:58 2019 +0800 reformat code and add more test case --- README.md | 12 ++ .../org/apache/connect/mongo/SourceTaskConfig.java | 116 +++++++++++++- .../mongo/connector/MongoSourceConnector.java | 6 +- .../connect/mongo/connector/MongoSourceTask.java | 26 ++-- .../mongo/connector/builder/MongoDataEntry.java | 39 ++--- .../connect/mongo/initsync/CollectionMeta.java | 6 +- .../apache/connect/mongo/initsync/InitSync.java | 30 ++-- .../apache/connect/mongo/replicator/Constants.java | 6 - .../apache/connect/mongo/replicator/Filter.java | 15 +- .../mongo/replicator/MongoClientFactory.java | 113 ++++++++++++++ .../connect/mongo/replicator/ReplicaSet.java | 11 +- .../connect/mongo/replicator/ReplicaSetConfig.java | 45 +++--- .../connect/mongo/replicator/ReplicaSets.java | 8 +- .../mongo/replicator/ReplicaSetsContext.java | 43 +---- .../connect/mongo/replicator/ReplicatorTask.java | 16 +- .../mongo/replicator/event/EventConverter.java | 13 +- .../mongo/replicator/event/ReplicationEvent.java | 36 ++--- .../java/org/apache/connect/mongo/FilterTest.java | 15 +- .../org/apache/connect/mongo/MongoFactoryTest.java | 173 +++++++++++++++++++++ .../connect/mongo/MongoSourceConnectorTest.java | 19 +-- .../apache/connect/mongo/MongoSourceTaskTest.java | 143 +++++++++++++++++ .../java/org/apache/connect/mongo/MongoTest.java | 18 +-- .../apache/connect/mongo/ReplicaContextTest.java | 3 +- .../org/apache/connect/mongo/ReplicaSetTest.java | 59 +++++++ .../org/apache/connect/mongo/ReplicaSetsTest.java | 19 +-- 25 files changed, 758 insertions(+), 232 deletions(-) diff --git a/README.md b/README.md new file mode 100644 index 0000000..fb2c6c6 --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +# RocketMQ-connect-mongo + +this is source connector moudle for mongo,you can run this by running rocketmq connecotr api, + +some junit rely on mongo database you can start with a docker container + +`docker run -p27027:27017 --name mongo-test -d mongo:4.0.10 --replSet "repl1"` + +and then init a mongo replicaSet + +`docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test + diff --git a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java index b79b2e9..3df9dc4 100644 --- a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java +++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java @@ -1,12 +1,11 @@ package org.apache.connect.mongo; import io.openmessaging.KeyValue; -import org.bson.BsonTimestamp; - import java.lang.reflect.Method; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import org.bson.BsonTimestamp; public class SourceTaskConfig { @@ -18,15 +17,50 @@ public class SourceTaskConfig { private String positionTimeStamp; private String positionInc; private String dataSync; + private String serverSelectionTimeoutMS; + private String connectTimeoutMS; + private String socketTimeoutMS; + private String ssl; + private String tsl; + private String tlsInsecure; + private String sslInvalidHostNameAllowed; + private String tlsAllowInvalidHostnames; + private String compressors; + private String zlibCompressionLevel; + private String trustStore; + private String trustStorePassword; private int copyThread = Runtime.getRuntime().availableProcessors(); - public static final Set<String> REQUEST_CONFIG = Collections.unmodifiableSet(new HashSet<String>() { { add("mongoAddr"); } }); + public String getTrustStore() { + return trustStore; + } + + public void setTrustStore(String trustStore) { + this.trustStore = trustStore; + } + + public String getTrustStorePassword() { + return trustStorePassword; + } + + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } + + public String getZlibCompressionLevel() { + return zlibCompressionLevel; + } + + public void setZlibCompressionLevel(String zlibCompressionLevel) { + this.zlibCompressionLevel = zlibCompressionLevel; + } + public String getPositionInc() { return positionInc; } @@ -67,7 +101,6 @@ public class SourceTaskConfig { this.mongoAddr = mongoAddr; } - public String getMongoUserName() { return mongoUserName; } @@ -84,7 +117,6 @@ public class SourceTaskConfig { this.mongoPassWord = mongoPassWord; } - public String getDataSync() { return dataSync; } @@ -93,15 +125,86 @@ public class SourceTaskConfig { this.dataSync = dataSync; } - public String getReplicaSet() { return replicaSet; } + public String getServerSelectionTimeoutMS() { + return serverSelectionTimeoutMS; + } + + public void setServerSelectionTimeoutMS(String serverSelectionTimeoutMS) { + this.serverSelectionTimeoutMS = serverSelectionTimeoutMS; + } + public void setReplicaSet(String replicaSet) { this.replicaSet = replicaSet; } + public String getConnectTimeoutMS() { + return connectTimeoutMS; + } + + public void setConnectTimeoutMS(String connectTimeoutMS) { + this.connectTimeoutMS = connectTimeoutMS; + } + + public String getSocketTimeoutMS() { + return socketTimeoutMS; + } + + public void setSocketTimeoutMS(String socketTimeoutMS) { + this.socketTimeoutMS = socketTimeoutMS; + } + + public String getSsl() { + return ssl; + } + + public void setSsl(String ssl) { + this.ssl = ssl; + } + + public String getTsl() { + return tsl; + } + + public void setTsl(String tsl) { + this.tsl = tsl; + } + + public String getTlsInsecure() { + return tlsInsecure; + } + + public void setTlsInsecure(String tlsInsecure) { + this.tlsInsecure = tlsInsecure; + } + + public String getSslInvalidHostNameAllowed() { + return sslInvalidHostNameAllowed; + } + + public void setSslInvalidHostNameAllowed(String sslInvalidHostNameAllowed) { + this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed; + } + + public String getTlsAllowInvalidHostnames() { + return tlsAllowInvalidHostnames; + } + + public void setTlsAllowInvalidHostnames(String tlsAllowInvalidHostnames) { + this.tlsAllowInvalidHostnames = tlsAllowInvalidHostnames; + } + + public String getCompressors() { + return compressors; + } + + public void setCompressors(String compressors) { + this.compressors = compressors; + } + public void load(KeyValue props) { properties2Object(props, this); @@ -148,7 +251,6 @@ public class SourceTaskConfig { } } - public BsonTimestamp getPosition() { return new BsonTimestamp(Integer.valueOf(positionTimeStamp), Integer.valueOf(positionInc)); } diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java index 2b28ea2..e3dfb6f 100644 --- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java @@ -3,19 +3,17 @@ package org.apache.connect.mongo.connector; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.source.SourceConnector; +import java.util.ArrayList; +import java.util.List; import org.apache.connect.mongo.SourceTaskConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - public class MongoSourceConnector extends SourceConnector { private Logger logger = LoggerFactory.getLogger(this.getClass()); private KeyValue keyValueConfig; - @Override public String verifyAndSetConfig(KeyValue config) { for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) { diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java index 407608a..da244cd 100644 --- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java @@ -4,16 +4,19 @@ import com.alibaba.fastjson.JSONObject; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.data.SourceDataEntry; import io.openmessaging.connector.api.source.SourceTask; -import org.apache.commons.lang3.StringUtils; -import org.apache.connect.mongo.SourceTaskConfig; -import org.apache.connect.mongo.replicator.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.SourceTaskConfig; +import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.replicator.ReplicaSet; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.ReplicaSets; +import org.apache.connect.mongo.replicator.ReplicaSetsContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class MongoSourceTask extends SourceTask { @@ -42,7 +45,7 @@ public class MongoSourceTask extends SourceTask { replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr()); replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> { ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap( - replicaSetName.getBytes())); + replicaSetName.getBytes())); if (byteBuffer != null && byteBuffer.array().length > 0) { String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8); ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class); @@ -50,11 +53,11 @@ public class MongoSourceTask extends SourceTask { } else { ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition(); position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null - && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches() - ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0); + && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches() + ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0); position.setInc(sourceTaskConfig.getPositionInc() != null - && pattern.matcher(sourceTaskConfig.getPositionInc()).matches() - ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0); + && pattern.matcher(sourceTaskConfig.getPositionInc()).matches() + ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0); position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false); replicaSetConfig.setPosition(position); } @@ -64,7 +67,6 @@ public class MongoSourceTask extends SourceTask { replicaSet.start(); }); - } catch (Throwable throwable) { logger.error("task start error", throwable); stop(); diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java index 7c0db7d..87d92ea 100644 --- a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java +++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java @@ -1,19 +1,27 @@ package org.apache.connect.mongo.connector.builder; - import com.alibaba.fastjson.JSONObject; -import io.openmessaging.connector.api.data.*; +import io.openmessaging.connector.api.data.DataEntryBuilder; +import io.openmessaging.connector.api.data.Field; +import io.openmessaging.connector.api.data.FieldType; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SourceDataEntry; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import org.apache.connect.mongo.replicator.Constants; import org.apache.connect.mongo.replicator.ReplicaSetConfig; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; import org.bson.BsonTimestamp; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; - -import static org.apache.connect.mongo.replicator.Constants.*; +import static org.apache.connect.mongo.replicator.Constants.CREATED; +import static org.apache.connect.mongo.replicator.Constants.NAMESPACE; +import static org.apache.connect.mongo.replicator.Constants.OBJECTID; +import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE; +import static org.apache.connect.mongo.replicator.Constants.PATCH; +import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP; +import static org.apache.connect.mongo.replicator.Constants.VERSION; public class MongoDataEntry { @@ -28,8 +36,8 @@ public class MongoDataEntry { Schema schema = createdSchema(replicaSetConfig.getReplicaSetName()); dataEntryBuilder = new DataEntryBuilder(schema); dataEntryBuilder.timestamp(System.currentTimeMillis()) - .queue(event.getNamespace().replace(".", "-").replace("$", "-")) - .entryType(event.getEntryType()); + .queue(event.getNamespace().replace(".", "-").replace("$", "-")) + .entryType(event.getEntryType()); dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson()); dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace()); @@ -38,8 +46,8 @@ public class MongoDataEntry { Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName()); dataEntryBuilder = new DataEntryBuilder(schema); dataEntryBuilder.timestamp(System.currentTimeMillis()) - .queue(event.getNamespace().replace(".", "-").replace("$", "-")) - .entryType(event.getEntryType()); + .queue(event.getNamespace().replace(".", "-").replace("$", "-")) + .entryType(event.getEntryType()); dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name()); dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue()); dataEntryBuilder.putFiled(VERSION, event.getV()); @@ -48,15 +56,13 @@ public class MongoDataEntry { dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : ""); } - String position = createPosition(event, replicaSetConfig); SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( - ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)), - ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8))); + ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8))); return sourceDataEntry; } - private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) { ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition(); BsonTimestamp timestamp = event.getTimestamp(); @@ -76,7 +82,6 @@ public class MongoDataEntry { return schema; } - private static Schema oplogSchema(String dataSourceName) { Schema schema = new Schema(); schema.setDataSource(dataSourceName); @@ -85,7 +90,6 @@ public class MongoDataEntry { return schema; } - private static void createdField(Schema schema) { Field namespace = new Field(0, NAMESPACE, FieldType.STRING); schema.getFields().add(namespace); @@ -109,5 +113,4 @@ public class MongoDataEntry { schema.getFields().add(objectId); } - } diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java index 018418c..9c73eac 100644 --- a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java +++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java @@ -34,8 +34,8 @@ public class CollectionMeta { @Override public String toString() { return "CollectionMeta{" + - "databaseName='" + databaseName + '\'' + - ", collectionName='" + collectionName + '\'' + - '}'; + "databaseName='" + databaseName + '\'' + + ", collectionName='" + collectionName + '\'' + + '}'; } } diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java index d92e968..6cdbe06 100644 --- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java +++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java @@ -3,6 +3,13 @@ package org.apache.connect.mongo.initsync; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoIterable; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.connect.mongo.replicator.ReplicaSet; import org.apache.connect.mongo.replicator.ReplicaSetConfig; import org.apache.connect.mongo.replicator.ReplicaSetsContext; @@ -13,14 +20,6 @@ import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; - public class InitSync { private Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -34,7 +33,8 @@ public class InitSync { private CountDownLatch countDownLatch; private ReplicaSet replicaSet; - public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context, ReplicaSet replicaSet) { + public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context, + ReplicaSet replicaSet) { this.replicaSetConfig = replicaSetConfig; this.mongoClient = mongoClient; this.context = context; @@ -89,7 +89,6 @@ public class InitSync { } - class CopyRunner implements Runnable { private MongoClient mongoClient; @@ -97,7 +96,8 @@ public class InitSync { private CollectionMeta collectionMeta; private ReplicaSet replicaSet; - public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, ReplicaSet replicaSet) { + public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, + ReplicaSet replicaSet) { this.mongoClient = mongoClient; this.countDownLatch = countDownLatch; this.collectionMeta = collectionMeta; @@ -110,10 +110,10 @@ public class InitSync { int count = 0; try { MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName()) - .getCollection(collectionMeta.getCollectionName()) - .find() - .batchSize(200) - .iterator(); + .getCollection(collectionMeta.getCollectionName()) + .find() + .batchSize(200) + .iterator(); while (replicaSet.isRuning() && mongoCursor.hasNext()) { if (context.initSyncAbort()) { logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count); diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java index 1a91a57..c895bd6 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java +++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java @@ -2,13 +2,9 @@ package org.apache.connect.mongo.replicator; public class Constants { - - public static final String MONGO_LOCAL_DATABASE = "local"; public static final String MONGO_OPLOG_RS = "oplog.rs"; - - public static final String OPERATIONTYPE = "op"; public static final String TIMESTAMP = "ts"; public static final String VERSION = "v"; @@ -17,11 +13,9 @@ public class Constants { public static final String OPERATION = "o"; public static final String OBJECTID = "o2"; - public static final String CREATED = "created"; public static final String PATCH = "patch"; - public static final String INITSYNC = "initSync"; } diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java index 3e431a3..fd26163 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java +++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java @@ -1,26 +1,23 @@ package org.apache.connect.mongo.replicator; - import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; import org.apache.commons.lang3.StringUtils; import org.apache.connect.mongo.SourceTaskConfig; import org.apache.connect.mongo.initsync.CollectionMeta; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Function; - public class Filter { private Function<CollectionMeta, Boolean> dbAndCollectionFilter; private Map<String, List<String>> interestMap = new HashMap<>(); private Function<OperationType, Boolean> notNoopFilter; - public Filter(SourceTaskConfig sourceTaskConfig) { String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection(); @@ -33,7 +30,6 @@ public class Filter { interestMap.put(db, collections); } - } dbAndCollectionFilter = (collectionMeta) -> { @@ -56,13 +52,12 @@ public class Filter { notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal(); } - public boolean filterMeta(CollectionMeta collectionMeta) { return dbAndCollectionFilter.apply(collectionMeta); } public boolean filterEvent(ReplicationEvent event) { return dbAndCollectionFilter.apply(new CollectionMeta(event.getDatabaseName(), event.getCollectionName())) - && notNoopFilter.apply(event.getOperationType()); + && notNoopFilter.apply(event.getOperationType()); } } diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java new file mode 100644 index 0000000..f5e01a3 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java @@ -0,0 +1,113 @@ +package org.apache.connect.mongo.replicator; + +import com.mongodb.ConnectionString; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.SourceTaskConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MongoClientFactory { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + private SourceTaskConfig taskConfig; + + public MongoClientFactory(SourceTaskConfig sourceTaskConfig) { + this.taskConfig = sourceTaskConfig; + } + + public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) { + StringBuilder sb = new StringBuilder(); + sb.append("mongodb://"); + if (StringUtils.isNotBlank(taskConfig.getMongoUserName()) + && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) { + sb.append(taskConfig.getMongoUserName()); + sb.append(":"); + sb.append(taskConfig.getMongoPassWord()); + sb.append("@"); + + } + sb.append(replicaSetConfig.getHost()); + sb.append("/"); + if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) { + sb.append("?"); + sb.append("replicaSet="); + sb.append(replicaSetConfig.getReplicaSetName()); + } + + if (StringUtils.isNotBlank(taskConfig.getServerSelectionTimeoutMS())) { + sb.append("&"); + sb.append("serverSelectionTimeoutMS="); + sb.append(taskConfig.getServerSelectionTimeoutMS()); + } + + if (StringUtils.isNotBlank(taskConfig.getConnectTimeoutMS())) { + sb.append("&"); + sb.append("connectTimeoutMS="); + sb.append(taskConfig.getConnectTimeoutMS()); + } + + if (StringUtils.isNotBlank(taskConfig.getSocketTimeoutMS())) { + sb.append("&"); + sb.append("socketTimeoutMS="); + sb.append(taskConfig.getSocketTimeoutMS()); + } + + if (StringUtils.isNotBlank(taskConfig.getSsl()) || StringUtils.isNotBlank(taskConfig.getTsl())) { + sb.append("&"); + sb.append("ssl="); + sb.append(true); + } + + if (StringUtils.isNotBlank(taskConfig.getTlsInsecure())) { + sb.append("&"); + sb.append("tlsInsecure="); + sb.append(taskConfig.getTlsInsecure()); + } + + if (StringUtils.isNotBlank(taskConfig.getTlsAllowInvalidHostnames())) { + sb.append("&"); + sb.append("tlsAllowInvalidHostnames="); + sb.append(taskConfig.getTlsAllowInvalidHostnames()); + } + + if (StringUtils.isNotBlank(taskConfig.getSslInvalidHostNameAllowed())) { + sb.append("&"); + sb.append("sslInvalidHostNameAllowed="); + sb.append(taskConfig.getSslInvalidHostNameAllowed()); + } + + if (StringUtils.isNotBlank(taskConfig.getCompressors())) { + sb.append("&"); + sb.append("compressors="); + sb.append(taskConfig.getCompressors()); + } + + if (StringUtils.isNotBlank(taskConfig.getZlibCompressionLevel())) { + sb.append("&"); + sb.append("zlibcompressionlevel="); + sb.append(taskConfig.getZlibCompressionLevel()); + } + + if (StringUtils.isNotBlank(taskConfig.getTrustStore())) { + Properties properties = System.getProperties(); + properties.put("javax.net.ssl.trustStore", taskConfig.getTrustStore()); + logger.info("javax.net.ssl.trustStore: {}", taskConfig.getTrustStore()); + } + + if (StringUtils.isNotBlank(taskConfig.getTrustStorePassword())) { + Properties properties = System.getProperties(); + properties.put("javax.net.ssl.trustStorePassword", taskConfig.getTrustStorePassword()); + logger.info("javax.net.ssl.trustStorePassword: {}", taskConfig.getTrustStorePassword()); + } + + logger.info("connection string :{}", sb.toString()); + System.out.println(sb.toString()); + ConnectionString connectionString = new ConnectionString(sb.toString()); + return MongoClients.create(connectionString); + } + +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java index b141c2b..8f4d0d8 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java @@ -4,18 +4,16 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoIterable; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.connect.mongo.replicator.Constants.MONGO_LOCAL_DATABASE; import static org.apache.connect.mongo.replicator.Constants.MONGO_OPLOG_RS; - public class ReplicaSet { private Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -24,7 +22,6 @@ public class ReplicaSet { private ReplicaSetConfig replicaSetConfig; - private ReplicaSetsContext replicaSetsContext; private MongoClient mongoClient; @@ -56,7 +53,6 @@ public class ReplicaSet { } } - public boolean isReplicaMongo() { MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE); MongoIterable<String> collectionNames = local.listCollectionNames(); @@ -82,7 +78,6 @@ public class ReplicaSet { } - public void pause() { pause = true; } diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java index 4b8d148..1b54b17 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java @@ -1,18 +1,15 @@ package org.apache.connect.mongo.replicator; -import org.bson.BsonTimestamp; - import java.util.Objects; +import org.bson.BsonTimestamp; public class ReplicaSetConfig { - private String shardName; private String replicaSetName; private String host; private Position position; - public Position getPosition() { return position; } @@ -55,13 +52,21 @@ public class ReplicaSetConfig { return new Position(0, 0, true); } + @Override + public String toString() { + return "ReplicaSetConfig{" + + "shardName='" + shardName + '\'' + + ", replicaSetName='" + replicaSetName + '\'' + + ", host='" + host + '\'' + + ", position=" + position + + '}'; + } public class Position { private int timeStamp; private int inc; private boolean initSync; - public int getTimeStamp() { return timeStamp; } @@ -86,7 +91,6 @@ public class ReplicaSetConfig { this.initSync = initSync; } - public Position(int timeStamp, int inc, boolean initSync) { this.timeStamp = timeStamp; this.inc = inc; @@ -104,20 +108,22 @@ public class ReplicaSetConfig { @Override public String toString() { return "Position{" + - "timeStamp=" + timeStamp + - ", inc=" + inc + - ", initSync=" + initSync + - '}'; + "timeStamp=" + timeStamp + + ", inc=" + inc + + ", initSync=" + initSync + + '}'; } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; Position position = (Position) o; return timeStamp == position.timeStamp && - inc == position.inc && - initSync == position.initSync; + inc == position.inc && + initSync == position.initSync; } @Override @@ -125,15 +131,4 @@ public class ReplicaSetConfig { return Objects.hash(timeStamp, inc, initSync); } } - - - @Override - public String toString() { - return "ReplicaSetConfig{" + - "shardName='" + shardName + '\'' + - ", replicaSetName='" + replicaSetName + '\'' + - ", host='" + host + '\'' + - ", position=" + position + - '}'; - } } diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java index 9184b90..af1ebeb 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java @@ -1,23 +1,20 @@ package org.apache.connect.mongo.replicator; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.Validate; - import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; public class ReplicaSets { private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)"); - private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>(); - public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) { replicaSetConfigs.forEach(replicaSetConfig -> { if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) { @@ -43,7 +40,6 @@ public class ReplicaSets { return new ReplicaSets(replicaSetConfigs); } - private static ReplicaSetConfig parseReplicaSetStr(String hosts) { if (hosts != null) { Matcher matcher = HOST_PATTERN.matcher(hosts); diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java index f66ca14..e599f5b 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java @@ -1,15 +1,7 @@ package org.apache.connect.mongo.replicator; -import com.mongodb.ConnectionString; import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; import io.openmessaging.connector.api.data.SourceDataEntry; -import org.apache.commons.lang3.StringUtils; -import org.apache.connect.mongo.SourceTaskConfig; -import org.apache.connect.mongo.connector.builder.MongoDataEntry; -import org.apache.connect.mongo.initsync.CollectionMeta; -import org.apache.connect.mongo.replicator.event.ReplicationEvent; - import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -17,6 +9,10 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.connect.mongo.SourceTaskConfig; +import org.apache.connect.mongo.connector.builder.MongoDataEntry; +import org.apache.connect.mongo.initsync.CollectionMeta; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; public class ReplicaSetsContext { @@ -30,57 +26,36 @@ public class ReplicaSetsContext { private Filter filter; + private MongoClientFactory mongoClientFactory; + public ReplicaSetsContext(SourceTaskConfig taskConfig) { this.taskConfig = taskConfig; this.replicaSets = new CopyOnWriteArrayList<>(); this.dataEntryQueue = new LinkedBlockingDeque<>(); this.filter = new Filter(taskConfig); + this.mongoClientFactory = new MongoClientFactory(taskConfig); } - public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) { - StringBuilder sb = new StringBuilder(); - sb.append("mongodb://"); - if (StringUtils.isNotBlank(taskConfig.getMongoUserName()) - && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) { - sb.append(taskConfig.getMongoUserName()); - sb.append(":"); - sb.append(taskConfig.getMongoPassWord()); - sb.append("@"); - - } - sb.append(replicaSetConfig.getHost()); - sb.append("/"); - if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) { - sb.append("?"); - sb.append("replicaSet="); - sb.append(replicaSetConfig.getReplicaSetName()); - } - ConnectionString connectionString = new ConnectionString(sb.toString()); - return MongoClients.create(connectionString); + return mongoClientFactory.createMongoClient(replicaSetConfig); } - public boolean filterEvent(ReplicationEvent event) { return filter.filterEvent(event); } - public boolean filterMeta(CollectionMeta collectionMeta) { return filter.filterMeta(collectionMeta); } - public int getCopyThread() { return taskConfig.getCopyThread() > 0 ? taskConfig.getCopyThread() : Runtime.getRuntime().availableProcessors(); } - public void addReplicaSet(ReplicaSet replicaSet) { this.replicaSets.add(replicaSet); } - public void shutdown() { replicaSets.forEach(ReplicaSet::shutdown); } @@ -89,12 +64,10 @@ public class ReplicaSetsContext { replicaSets.forEach(ReplicaSet::pause); } - public void resume() { replicaSets.forEach(ReplicaSet::resume); } - public void publishEvent(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) { SourceDataEntry sourceDataEntry = MongoDataEntry.createSouceDataEntry(event, replicaSetConfig); while (true) { diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java index bf4ebac..6cb46d1 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java @@ -13,7 +13,6 @@ import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - public class ReplicatorTask implements Runnable { private Logger logger = LoggerFactory.getLogger(this.getClass()); @@ -26,7 +25,8 @@ public class ReplicatorTask implements Runnable { private ReplicaSetsContext replicaSetsContext; - public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) { + public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig, + ReplicaSetsContext replicaSetsContext) { this.replicaSet = replicaSet; this.replicaSetConfig = replicaSetConfig; this.mongoClient = mongoClient; @@ -45,15 +45,15 @@ public class ReplicatorTask implements Runnable { FindIterable<Document> iterable; if (replicaSetConfig.getPosition().isValid()) { iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find( - Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp())); + Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp())); } else { iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); } MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1)) - .noCursorTimeout(true) - .cursorType(CursorType.TailableAwait) - .batchSize(200) - .iterator(); + .noCursorTimeout(true) + .cursorType(CursorType.TailableAwait) + .batchSize(200) + .iterator(); while (replicaSet.isRuning()) { try { @@ -70,7 +70,6 @@ public class ReplicatorTask implements Runnable { logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig); } - private void executorCursor(MongoCursor<Document> cursor) { while (cursor.hasNext() && !replicaSet.isPause()) { Document document = cursor.next(); @@ -81,5 +80,4 @@ public class ReplicatorTask implements Runnable { } } - } diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java index 57b4ac8..1b48990 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java +++ b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java @@ -1,16 +1,19 @@ package org.apache.connect.mongo.replicator.event; +import java.util.Optional; import org.bson.BsonTimestamp; import org.bson.Document; -import java.util.Optional; - -import static org.apache.connect.mongo.replicator.Constants.*; - +import static org.apache.connect.mongo.replicator.Constants.HASH; +import static org.apache.connect.mongo.replicator.Constants.NAMESPACE; +import static org.apache.connect.mongo.replicator.Constants.OBJECTID; +import static org.apache.connect.mongo.replicator.Constants.OPERATION; +import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE; +import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP; +import static org.apache.connect.mongo.replicator.Constants.VERSION; public class EventConverter { - public static ReplicationEvent convert(Document document, String replicaSetName) { ReplicationEvent event = new ReplicationEvent(); diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java index 283e9d6..6407781 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java +++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java @@ -1,12 +1,11 @@ package org.apache.connect.mongo.replicator.event; import io.openmessaging.connector.api.data.EntryType; +import java.util.Optional; import org.apache.commons.lang3.StringUtils; import org.bson.BsonTimestamp; import org.bson.Document; -import java.util.Optional; - public class ReplicationEvent { private Document document; @@ -21,13 +20,12 @@ public class ReplicationEvent { private Optional<Document> objectId; private String replicaSetName; - public ReplicationEvent() { } - - public ReplicationEvent(OperationType operationType, BsonTimestamp timestamp, Integer v, Long h, String namespace, Optional<Document> eventData, Optional<Document> objectId, Document document) { + public ReplicationEvent(OperationType operationType, BsonTimestamp timestamp, Integer v, Long h, String namespace, + Optional<Document> eventData, Optional<Document> objectId, Document document) { this.operationType = operationType; this.v = v; this.h = h; @@ -41,7 +39,6 @@ public class ReplicationEvent { this.document = document; } - public OperationType getOperationType() { return operationType; } @@ -91,12 +88,10 @@ public class ReplicationEvent { } } - public void setOperationType(OperationType operationType) { this.operationType = operationType; } - public Document getDocument() { return document; } @@ -137,7 +132,6 @@ public class ReplicationEvent { this.objectId = objectId; } - public void setReplicaSetName(String replicaSetName) { this.replicaSetName = replicaSetName; } @@ -149,17 +143,17 @@ public class ReplicationEvent { @Override public String toString() { return "ReplicationEvent{" + - "document=" + document + - ", operationType=" + operationType + - ", v=" + v + - ", h=" + h + - ", timestamp=" + timestamp + - ", databaseName='" + databaseName + '\'' + - ", collectionName='" + collectionName + '\'' + - ", namespace='" + namespace + '\'' + - ", eventData=" + eventData + - ", objectId=" + objectId + - ", replicaSetName='" + replicaSetName + '\'' + - '}'; + "document=" + document + + ", operationType=" + operationType + + ", v=" + v + + ", h=" + h + + ", timestamp=" + timestamp + + ", databaseName='" + databaseName + '\'' + + ", collectionName='" + collectionName + '\'' + + ", namespace='" + namespace + '\'' + + ", eventData=" + eventData + + ", objectId=" + objectId + + ", replicaSetName='" + replicaSetName + '\'' + + '}'; } } diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java index 60e2514..a804b8b 100644 --- a/src/test/java/org/apache/connect/mongo/FilterTest.java +++ b/src/test/java/org/apache/connect/mongo/FilterTest.java @@ -1,6 +1,10 @@ package org.apache.connect.mongo; import com.alibaba.fastjson.JSONObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.connect.mongo.initsync.CollectionMeta; import org.apache.connect.mongo.replicator.Filter; import org.apache.connect.mongo.replicator.event.OperationType; @@ -9,14 +13,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class FilterTest { - private SourceTaskConfig sourceTaskConfig; private Map<String, List<String>> insterest; @@ -37,7 +35,6 @@ public class FilterTest { Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01"))); } - @Test public void testBlankDb() { Filter filter = new Filter(sourceTaskConfig); @@ -45,7 +42,6 @@ public class FilterTest { Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01"))); } - @Test public void testAsterisk() { List<String> collections = new ArrayList<>(); @@ -57,8 +53,6 @@ public class FilterTest { Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032"))); } - - @Test public void testFilterEvent() { Filter filter = new Filter(sourceTaskConfig); @@ -69,5 +63,4 @@ public class FilterTest { Assert.assertTrue(filter.filterEvent(replicationEvent)); } - } diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java new file mode 100644 index 0000000..93adeeb --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java @@ -0,0 +1,173 @@ +package org.apache.connect.mongo; + +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoTimeoutException; +import com.mongodb.client.internal.MongoClientImpl; +import java.lang.reflect.Field; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.replicator.MongoClientFactory; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MongoFactoryTest { + + private ReplicaSetConfig replicaSetConfig; + + private SourceTaskConfig sourceTaskConfig; + + private MongoClientFactory mongoClientFactory; + + private MongoClientImpl client; + + @Before + public void before() { + this.replicaSetConfig = new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27027"); + this.sourceTaskConfig = new SourceTaskConfig(); + this.mongoClientFactory = new MongoClientFactory(sourceTaskConfig); + } + + @After + public void after() { + client.close(); + } + + @Test + public void testCreateMongoClientWithSSL() { + sourceTaskConfig.setSsl("ssl"); + MongoClientSettings settings = getSettings(); + System.out.println(settings.getSslSettings()); + Assert.assertTrue(settings.getSslSettings().isEnabled()); + } + + @Test + public void testCreateMongoClientWithTSL() { + sourceTaskConfig.setTsl("tsl"); + MongoClientSettings settings = getSettings(); + System.out.println(settings.getSslSettings()); + Assert.assertTrue(settings.getSslSettings().isEnabled()); + } + + @Test + public void testCreateMongoClientWithserverSelectionTimeoutMS() { + try { + replicaSetConfig.setReplicaSetName("testReplicatSet"); + sourceTaskConfig.setServerSelectionTimeoutMS("150"); + System.out.println(getSettings().getClusterSettings()); + Assert.assertTrue(getSettings().getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS) == 150); + } catch (MongoTimeoutException exception) { + Assert.assertTrue(StringUtils.startsWith(exception.getMessage(), "Timed out after 100 ms while waiting for a server that matches")); + } + } + + @Test + public void testCreateMongoClientWithConnectTimeoutMS() { + sourceTaskConfig.setConnectTimeoutMS("1200"); + System.out.println(getSettings().getSocketSettings()); + Assert.assertTrue(getSettings().getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS) == 1200); + + } + + @Test + public void testCreateMongoClientWithSocketTimeoutMS() { + sourceTaskConfig.setSocketTimeoutMS("1100"); + System.out.println(getSettings().getSocketSettings()); + Assert.assertTrue(getSettings().getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS) == 1100); + } + + @Test + public void testCreateMongoClientWithInvalidHostNameAllowed() { + sourceTaskConfig.setSslInvalidHostNameAllowed("true"); + System.out.println(getSettings().getSslSettings()); + Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed()); + + sourceTaskConfig.setSslInvalidHostNameAllowed("false"); + System.out.println(getSettings().getSslSettings()); + Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed()); + } + + @Test + public void testCreateMongoClientWithInvalidHostNameAllowedTsl() { + sourceTaskConfig.setTlsAllowInvalidHostnames("true"); + System.out.println(getSettings().getSslSettings()); + Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed()); + + sourceTaskConfig.setTlsAllowInvalidHostnames("false"); + System.out.println(getSettings().getSslSettings()); + Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed()); + } + + @Test + public void testCreateMongoClientWithTlsinsecure() { + sourceTaskConfig.setTlsInsecure("true"); + System.out.println(getSettings().getSslSettings()); + Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed()); + + sourceTaskConfig.setTlsInsecure("false"); + System.out.println(getSettings().getSslSettings()); + Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed()); + } + + @Test + public void testCreateMongoClientWithCompression() { + sourceTaskConfig.setCompressors("zlib"); + System.out.println(getSettings().getCompressorList()); + Assert.assertTrue(getSettings().getCompressorList().get(0).getName().equals("zlib")); + } + + @Test + public void testCreateMongoClientWithCompressionLevel() { + sourceTaskConfig.setCompressors("zlib"); + sourceTaskConfig.setZlibCompressionLevel("7"); + System.out.println(getSettings().getCompressorList()); + Assert.assertTrue(getSettings().getCompressorList().get(0).getName().equals("zlib")); + Assert.assertTrue(getSettings().getCompressorList().get(0).getProperty("level", 0) == 7); + } + + @Test + public void testCreateMongoClientWithAuth() { + sourceTaskConfig.setMongoUserName("test"); + sourceTaskConfig.setMongoPassWord("123456"); + System.out.println(getSettings().getCredential()); + Assert.assertTrue(getSettings().getCredential().getSource().equals("admin")); + Assert.assertTrue(getSettings().getCredential().getUserName().equals("test")); + Assert.assertTrue(new String(getSettings().getCredential().getPassword()).equals("123456")); + } + + private MongoClientSettings getSettings() { + try { + client = (MongoClientImpl) mongoClientFactory.createMongoClient(replicaSetConfig); + Field field = MongoClientImpl.class.getDeclaredField("settings"); + field.setAccessible(true); + return (MongoClientSettings) field.get(client); + } catch (Exception e) { + + } + return null; + } + +// @Test +// public void testSSLTrustStore() { +// sourceTaskConfig.setMongoUserName("user_test"); +// sourceTaskConfig.setMongoPassWord("pwd_test"); +// sourceTaskConfig.setSsl("ssl"); +// sourceTaskConfig.setSslInvalidHostNameAllowed("true"); +// sourceTaskConfig.setTrustStore("/Users/liping/test.pem"); +// sourceTaskConfig.setTrustStorePassword("test001"); +// sourceTaskConfig.setServerSelectionTimeoutMS("10000"); +// MongoClient client = mongoClientFactory.createMongoClient(replicaSetConfig); +// MongoCollection<Document> collection = client.getDatabase("test").getCollection("person"); +// Document document = new Document(); +// document.put("name", "liping"); +// collection.insertOne(document); +// MongoCursor<Document> iterator = collection.find().iterator(); +// while (iterator.hasNext()) { +// System.out.println(iterator.next()); +// } +// +// } + +} diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java index 1f6227b..f85e4a9 100644 --- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java @@ -5,6 +5,11 @@ import io.openmessaging.connector.api.data.EntryType; import io.openmessaging.connector.api.data.Schema; import io.openmessaging.connector.api.data.SourceDataEntry; import io.openmessaging.internal.DefaultKeyValue; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.connect.mongo.connector.MongoSourceConnector; import org.apache.connect.mongo.connector.MongoSourceTask; import org.apache.connect.mongo.replicator.ReplicaSetConfig; @@ -17,12 +22,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.LinkedBlockingQueue; - public class MongoSourceConnectorTest { private MongoSourceConnector mongoSourceConnector; @@ -42,14 +41,12 @@ public class MongoSourceConnectorTest { Assert.assertEquals(mongoSourceConnector.taskClass(), MongoSourceTask.class); } - @Test public void verifyConfig() { String s = mongoSourceConnector.verifyAndSetConfig(keyValue); Assert.assertTrue(s.contains("Request config key:")); } - @Test public void testPoll() throws Exception { LinkedBlockingQueue<SourceDataEntry> entries = new LinkedBlockingQueue<>(); @@ -65,25 +62,22 @@ public class MongoSourceConnectorTest { event.setH(324243242L); event.setEventData(Optional.ofNullable(new Document("testEventKey", "testEventValue"))); event.setObjectId(Optional.empty()); - context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27017")); + context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27027")); List<SourceDataEntry> sourceDataEntries = (List<SourceDataEntry>) context.poll(); Assert.assertTrue(sourceDataEntries.size() == 1); SourceDataEntry sourceDataEntry = sourceDataEntries.get(0); Assert.assertEquals("test-person", sourceDataEntry.getQueueName()); - ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition(); Assert.assertEquals("testReplicaName", new String(sourcePartition.array())); - ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition(); ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class); Assert.assertEquals(position.getTimeStamp(), 1565609506); Assert.assertEquals(position.getInc(), 1); Assert.assertEquals(position.isInitSync(), false); - EntryType entryType = sourceDataEntry.getEntryType(); Assert.assertEquals(EntryType.CREATE, entryType); @@ -97,5 +91,4 @@ public class MongoSourceConnectorTest { } - } diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java new file mode 100644 index 0000000..b696393 --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java @@ -0,0 +1,143 @@ +package org.apache.connect.mongo; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.PositionStorageReader; +import io.openmessaging.connector.api.source.SourceTask; +import io.openmessaging.connector.api.source.SourceTaskContext; +import io.openmessaging.internal.DefaultKeyValue; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.connector.MongoSourceTask; +import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.replicator.ReplicaSet; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.ReplicaSetsContext; +import org.junit.Assert; +import org.junit.Test; + +public class MongoSourceTaskTest { + + @Test + public void testEmptyContextStart() throws NoSuchFieldException, IllegalAccessException { + MongoSourceTask mongoSourceTask = new MongoSourceTask(); + DefaultKeyValue defaultKeyValue = new DefaultKeyValue(); + defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27027"); + defaultKeyValue.put("positionTimeStamp", "11111111"); + defaultKeyValue.put("positionInc", "111"); + defaultKeyValue.put("serverSelectionTimeoutMS", "10"); + defaultKeyValue.put("dataSync", Constants.INITSYNC); + + Field context = SourceTask.class.getDeclaredField("context"); + context.setAccessible(true); + context.set(mongoSourceTask, emptyTaskContext()); + mongoSourceTask.start(defaultKeyValue); + + Field replicaSetsContext = MongoSourceTask.class.getDeclaredField("replicaSetsContext"); + replicaSetsContext.setAccessible(true); + ReplicaSetsContext setsContext = (ReplicaSetsContext) replicaSetsContext.get(mongoSourceTask); + + Field replicaSets = ReplicaSetsContext.class.getDeclaredField("replicaSets"); + replicaSets.setAccessible(true); + List<ReplicaSet> replicaSetList = (List<ReplicaSet>) replicaSets.get(setsContext); + Assert.assertTrue(replicaSetList.size() == 1); + ReplicaSet replicaSet = replicaSetList.get(0); + Field replicaSetConfig = ReplicaSet.class.getDeclaredField("replicaSetConfig"); + replicaSetConfig.setAccessible(true); + ReplicaSetConfig replicaSetConfig1 = (ReplicaSetConfig) replicaSetConfig.get(replicaSet); + Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test")); + Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027")); + Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 11111111); + Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 111); + Assert.assertTrue(replicaSetConfig1.getPosition().isInitSync()); + } + + private SourceTaskContext emptyTaskContext() { + return new SourceTaskContext() { + @Override + public PositionStorageReader positionStorageReader() { + return new PositionStorageReader() { + @Override + public ByteBuffer getPosition(ByteBuffer partition) { + return null; + } + + @Override + public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) { + return null; + } + }; + } + + @Override + public KeyValue configs() { + return null; + } + }; + } + + @Test + public void testContextStart() throws NoSuchFieldException, IllegalAccessException { + MongoSourceTask mongoSourceTask = new MongoSourceTask(); + DefaultKeyValue defaultKeyValue = new DefaultKeyValue(); + defaultKeyValue.put("mongoAddr", "test/127.0.0.1:27027"); + defaultKeyValue.put("serverSelectionTimeoutMS", "10"); + + Field context = SourceTask.class.getDeclaredField("context"); + context.setAccessible(true); + context.set(mongoSourceTask, TaskContext()); + mongoSourceTask.start(defaultKeyValue); + + Field replicaSetsContext = MongoSourceTask.class.getDeclaredField("replicaSetsContext"); + replicaSetsContext.setAccessible(true); + ReplicaSetsContext setsContext = (ReplicaSetsContext) replicaSetsContext.get(mongoSourceTask); + + Field replicaSets = ReplicaSetsContext.class.getDeclaredField("replicaSets"); + replicaSets.setAccessible(true); + List<ReplicaSet> replicaSetList = (List<ReplicaSet>) replicaSets.get(setsContext); + Assert.assertTrue(replicaSetList.size() == 1); + ReplicaSet replicaSet = replicaSetList.get(0); + Field replicaSetConfig = ReplicaSet.class.getDeclaredField("replicaSetConfig"); + replicaSetConfig.setAccessible(true); + ReplicaSetConfig replicaSetConfig1 = (ReplicaSetConfig) replicaSetConfig.get(replicaSet); + Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getReplicaSetName(), "test")); + Assert.assertTrue(StringUtils.equals(replicaSetConfig1.getHost(), "127.0.0.1:27027")); + Assert.assertTrue(replicaSetConfig1.getPosition().getTimeStamp() == 22222222); + Assert.assertTrue(replicaSetConfig1.getPosition().getInc() == 222); + Assert.assertTrue(!replicaSetConfig1.getPosition().isInitSync()); + } + + private SourceTaskContext TaskContext() { + return new SourceTaskContext() { + @Override + public PositionStorageReader positionStorageReader() { + return new PositionStorageReader() { + @Override + public ByteBuffer getPosition(ByteBuffer partition) { + + Map<String, Object> po = new HashMap<>(); + po.put("timeStamp", 22222222); + po.put("inc", 222); + po.put("initSync", false); + return ByteBuffer.wrap(JSONObject.toJSONString(po).getBytes()); + } + + @Override + public Map<ByteBuffer, ByteBuffer> getPositions(Collection<ByteBuffer> partitions) { + return null; + } + }; + } + + @Override + public KeyValue configs() { + return null; + } + }; + } +} diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java index 7b0291a..98e9a42 100644 --- a/src/test/java/org/apache/connect/mongo/MongoTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoTest.java @@ -9,6 +9,14 @@ import com.mongodb.client.MongoCollection; import io.openmessaging.connector.api.data.EntryType; import io.openmessaging.connector.api.data.Schema; import io.openmessaging.connector.api.data.SourceDataEntry; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.connect.mongo.initsync.InitSync; import org.apache.connect.mongo.replicator.Constants; import org.apache.connect.mongo.replicator.ReplicaSet; @@ -23,11 +31,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.lang.reflect.Field; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; - public class MongoTest { private MongoClient mongoClient; @@ -35,11 +38,10 @@ public class MongoTest { @Before public void before() { MongoClientSettings.Builder builder = MongoClientSettings.builder(); - builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077")); + builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27027")); mongoClient = MongoClients.create(builder.build()); } - @Test public void testConvertEvent() { Document oplog = new Document(); @@ -60,10 +62,8 @@ public class MongoTest { Assert.assertEquals(document, event.getEventData().get()); Assert.assertEquals("testR", event.getReplicaSetName()); - } - @Test public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException { MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person"); diff --git a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java index 8613c42..16cb959 100644 --- a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java +++ b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java @@ -20,10 +20,9 @@ public class ReplicaContextTest { context = new ReplicaSetsContext(sourceTaskConfig); } - @Test public void testCreateMongoClient() { - MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27017")); + MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027")); MongoIterable<String> collectionNames = mongoClient.getDatabase("local").listCollectionNames(); MongoCursor<String> iterator = collectionNames.iterator(); while (iterator.hasNext()) { diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java new file mode 100644 index 0000000..07eefae --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/ReplicaSetTest.java @@ -0,0 +1,59 @@ +package org.apache.connect.mongo; + +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.connect.mongo.replicator.ReplicaSet; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.ReplicaSetsContext; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ReplicaSetTest { + + private ReplicaSet replicaSet; + + private SourceTaskConfig sourceTaskConfig; + + private ReplicaSetConfig replicaSetConfig; + + private ReplicaSetsContext replicaSetsContext; + + @Before + public void before() { + this.sourceTaskConfig = new SourceTaskConfig(); + this.replicaSetConfig = new ReplicaSetConfig("shardName1", "", "127.0.0.1:27027"); + this.replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig); + this.replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext); + } + + @Test + public void testStartAndShutDown() throws NoSuchFieldException, IllegalAccessException { + replicaSet.start(); + Field field = ReplicaSet.class.getDeclaredField("running"); + field.setAccessible(true); + AtomicBoolean o = (AtomicBoolean) field.get(replicaSet); + Assert.assertTrue(o.get()); + replicaSet.shutdown(); + Assert.assertFalse(o.get()); + } + + @Test + public void testPause() throws Exception { + replicaSet.pause(); + Field field = ReplicaSet.class.getDeclaredField("pause"); + field.setAccessible(true); + boolean pause = (boolean) field.get(replicaSet); + Assert.assertTrue(pause); + } + + @Test + public void testResume() throws Exception { + replicaSet.resume(); + Field field = ReplicaSet.class.getDeclaredField("pause"); + field.setAccessible(true); + boolean pause = (boolean) field.get(replicaSet); + Assert.assertFalse(pause); + } + +} diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java index e69eac6..5276f4f 100644 --- a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java +++ b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java @@ -1,29 +1,25 @@ package org.apache.connect.mongo; +import java.util.Map; import org.apache.connect.mongo.replicator.ReplicaSetConfig; import org.apache.connect.mongo.replicator.ReplicaSets; import org.junit.Assert; import org.junit.Test; -import java.util.Map; - public class ReplicaSetsTest { - @Test(expected = IllegalArgumentException.class) - public void testCreatReplicaSetsException01() { + public void testCreatReplicaSetsExceptionWithoutMongoAddr() { ReplicaSets.create(""); } - @Test(expected = IllegalArgumentException.class) - public void testCreatReplicaSetsException02() { + public void testCreatReplicaSetsExceptioWithoutReplicaSetName() { ReplicaSets.create("127.0.0.1:27081"); } - @Test - public void testCreatReplicaSets01() { + public void testCreatReplicaSetsSpecialReplicaSetName() { ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083"); Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); Assert.assertTrue(replicaSetConfigMap.size() == 1); @@ -32,9 +28,8 @@ public class ReplicaSetsTest { Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName()); } - @Test - public void testCreatReplicaSets02() { + public void testCreatReplicaSetsSpecialShardNameAndReplicaSetName() { ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083"); Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); Assert.assertTrue(replicaSetConfigMap.size() == 1); @@ -44,9 +39,8 @@ public class ReplicaSetsTest { Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName()); } - @Test - public void testCreatReplicaSets03() { + public void testCreatReplicaSetsMutiMongoAddr() { ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283"); Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); Assert.assertTrue(replicaSetConfigMap.size() == 2); @@ -55,7 +49,6 @@ public class ReplicaSetsTest { Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName()); Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName()); - Assert.assertNotNull(replicaSetConfigMap.get("replicaName2")); Assert.assertEquals("127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283", replicaSetConfigMap.get("replicaName2").getHost()); Assert.assertEquals("replicaName2", replicaSetConfigMap.get("replicaName2").getReplicaSetName());
