This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 2cdb27a2b2e418a1228c0d83adf6acff64e79554 Author: 李平 <[email protected]> AuthorDate: Thu Aug 22 15:10:57 2019 +0800 fix some bug and add readme --- README.md | 22 +++ .../org/apache/connect/mongo/SourceTaskConfig.java | 188 +++++++++++---------- .../mongo/connector/MongoSourceConnector.java | 17 ++ .../connect/mongo/connector/MongoSourceTask.java | 49 +++--- .../mongo/connector/builder/MongoDataEntry.java | 32 +++- .../connect/mongo/initsync/CollectionMeta.java | 17 ++ .../apache/connect/mongo/initsync/InitSync.java | 27 ++- .../apache/connect/mongo/replicator/Constants.java | 23 ++- .../apache/connect/mongo/replicator/Filter.java | 19 ++- .../mongo/replicator/MongoClientFactory.java | 38 +++-- .../apache/connect/mongo/replicator/Position.java | 85 ++++++++++ .../connect/mongo/replicator/ReplicaSet.java | 32 +++- .../connect/mongo/replicator/ReplicaSetConfig.java | 93 ++-------- .../{ReplicaSets.java => ReplicaSetManager.java} | 29 +++- .../mongo/replicator/ReplicaSetsContext.java | 29 +++- .../connect/mongo/replicator/ReplicatorTask.java | 62 +++++-- ...Converter.java => Document2EventConverter.java} | 27 ++- .../mongo/replicator/event/OperationType.java | 54 ++++-- .../mongo/replicator/event/ReplicationEvent.java | 17 ++ .../java/org/apache/connect/mongo/FilterTest.java | 3 +- .../org/apache/connect/mongo/MongoFactoryTest.java | 22 +-- .../connect/mongo/MongoSourceConnectorTest.java | 3 +- .../apache/connect/mongo/MongoSourceTaskTest.java | 3 +- .../java/org/apache/connect/mongo/MongoTest.java | 25 ++- .../apache/connect/mongo/OperationTypeTest.java | 37 ++++ ...icaSetsTest.java => ReplicaSetManagerTest.java} | 20 +-- 26 files changed, 687 insertions(+), 286 deletions(-) diff --git a/README.md b/README.md index fb2c6c6..e32c455 100644 --- a/README.md +++ b/README.md @@ -10,3 +10,25 @@ and then init a mongo replicaSet `docker exec -it mongo-test mongo ` and `rs.initiate()` and then you can run all junit test + + +## task config params + +| param | Description | type | +| --- | --- | --- | +| mongoAddr | shardName=replicaSetName/127.0.0.1:2781,127.0.0.1:2782,127.0.0.1:2783; | string, split by ; | +| mongoUserName | mongo root username| string | +| mongoPassWord | mongo root password| string | +| interestDbAndCollection | {"dbName":["collection1","collection2"]}, collectionName can be "*" means all collection | json | +| positionTimeStamp | mongo oplog `bsontimestamp.value`, runtime store position is highest level | int | +| positionInc | mongo oplog `bsontimestamp.inc`, runtime store position is highest level | int | +| dataSync | sync all interestDbAndCollection data, runtime store position is highest level | json, Map<String(dbName), List<String(collectionName)>> | +| serverSelectionTimeoutMS | mongo driver select replicaServer timeout | long | +| connectTimeoutMS | mongo driver connect socket timeout | long | +| socketTimeoutMS | mongo driver read or write timeout | long | +| ssl or tsl | mongo driver use ssl or tsl | boolean | +| tlsInsecure or sslInvalidHostNameAllowed | mongo driver when use ssl or tsl allow invalid hostname | boolean| +| compressors | compressors way | string (zlib or snappy) +| zlibCompressionLevel | zlib compressors level| int (1-7)| +| trustStore | ssl pem| path| +| trustStorePassword | ssl pem decrypt password | string| diff --git a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java index 3df9dc4..d184b5c 100644 --- a/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java +++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo; import io.openmessaging.KeyValue; @@ -9,22 +26,21 @@ import org.bson.BsonTimestamp; public class SourceTaskConfig { - private String replicaSet; private String mongoAddr; private String mongoUserName; private String mongoPassWord; private String interestDbAndCollection; - private String positionTimeStamp; - private String positionInc; - private String dataSync; - private String serverSelectionTimeoutMS; - private String connectTimeoutMS; - private String socketTimeoutMS; - private String ssl; - private String tsl; - private String tlsInsecure; - private String sslInvalidHostNameAllowed; - private String tlsAllowInvalidHostnames; + private int positionTimeStamp; + private int positionInc; + private boolean dataSync; + private long serverSelectionTimeoutMS; + private long connectTimeoutMS; + private long socketTimeoutMS; + private boolean ssl; + private boolean tsl; + private boolean tlsInsecure; + private boolean sslInvalidHostNameAllowed; + private boolean tlsAllowInvalidHostnames; private String compressors; private String zlibCompressionLevel; private String trustStore; @@ -37,52 +53,28 @@ public class SourceTaskConfig { } }); - public String getTrustStore() { - return trustStore; - } - - public void setTrustStore(String trustStore) { - this.trustStore = trustStore; - } - - public String getTrustStorePassword() { - return trustStorePassword; - } - - public void setTrustStorePassword(String trustStorePassword) { - this.trustStorePassword = trustStorePassword; - } - - public String getZlibCompressionLevel() { - return zlibCompressionLevel; - } - - public void setZlibCompressionLevel(String zlibCompressionLevel) { - this.zlibCompressionLevel = zlibCompressionLevel; - } - - public String getPositionInc() { - return positionInc; + public String getMongoAddr() { + return mongoAddr; } - public void setPositionInc(String positionInc) { - this.positionInc = positionInc; + public void setMongoAddr(String mongoAddr) { + this.mongoAddr = mongoAddr; } - public int getCopyThread() { - return copyThread; + public String getMongoUserName() { + return mongoUserName; } - public void setCopyThread(int copyThread) { - this.copyThread = copyThread; + public void setMongoUserName(String mongoUserName) { + this.mongoUserName = mongoUserName; } - public String getPositionTimeStamp() { - return positionTimeStamp; + public String getMongoPassWord() { + return mongoPassWord; } - public void setPositionTimeStamp(String positionTimeStamp) { - this.positionTimeStamp = positionTimeStamp; + public void setMongoPassWord(String mongoPassWord) { + this.mongoPassWord = mongoPassWord; } public String getInterestDbAndCollection() { @@ -93,107 +85,91 @@ public class SourceTaskConfig { this.interestDbAndCollection = interestDbAndCollection; } - public String getMongoAddr() { - return mongoAddr; - } - - public void setMongoAddr(String mongoAddr) { - this.mongoAddr = mongoAddr; - } - - public String getMongoUserName() { - return mongoUserName; + public int getPositionTimeStamp() { + return positionTimeStamp; } - public void setMongoUserName(String mongoUserName) { - this.mongoUserName = mongoUserName; + public void setPositionTimeStamp(int positionTimeStamp) { + this.positionTimeStamp = positionTimeStamp; } - public String getMongoPassWord() { - return mongoPassWord; + public int getPositionInc() { + return positionInc; } - public void setMongoPassWord(String mongoPassWord) { - this.mongoPassWord = mongoPassWord; + public void setPositionInc(int positionInc) { + this.positionInc = positionInc; } - public String getDataSync() { + public boolean isDataSync() { return dataSync; } - public void setDataSync(String dataSync) { + public void setDataSync(boolean dataSync) { this.dataSync = dataSync; } - public String getReplicaSet() { - return replicaSet; - } - - public String getServerSelectionTimeoutMS() { + public long getServerSelectionTimeoutMS() { return serverSelectionTimeoutMS; } - public void setServerSelectionTimeoutMS(String serverSelectionTimeoutMS) { + public void setServerSelectionTimeoutMS(long serverSelectionTimeoutMS) { this.serverSelectionTimeoutMS = serverSelectionTimeoutMS; } - public void setReplicaSet(String replicaSet) { - this.replicaSet = replicaSet; - } - - public String getConnectTimeoutMS() { + public long getConnectTimeoutMS() { return connectTimeoutMS; } - public void setConnectTimeoutMS(String connectTimeoutMS) { + public void setConnectTimeoutMS(long connectTimeoutMS) { this.connectTimeoutMS = connectTimeoutMS; } - public String getSocketTimeoutMS() { + public long getSocketTimeoutMS() { return socketTimeoutMS; } - public void setSocketTimeoutMS(String socketTimeoutMS) { + public void setSocketTimeoutMS(long socketTimeoutMS) { this.socketTimeoutMS = socketTimeoutMS; } - public String getSsl() { + public boolean getSsl() { return ssl; } - public void setSsl(String ssl) { + public void setSsl(boolean ssl) { this.ssl = ssl; } - public String getTsl() { + public boolean getTsl() { return tsl; } - public void setTsl(String tsl) { + public void setTsl(boolean tsl) { this.tsl = tsl; } - public String getTlsInsecure() { + public boolean getTlsInsecure() { return tlsInsecure; } - public void setTlsInsecure(String tlsInsecure) { + public void setTlsInsecure(boolean tlsInsecure) { this.tlsInsecure = tlsInsecure; } - public String getSslInvalidHostNameAllowed() { + public boolean getSslInvalidHostNameAllowed() { return sslInvalidHostNameAllowed; } - public void setSslInvalidHostNameAllowed(String sslInvalidHostNameAllowed) { + public void setSslInvalidHostNameAllowed(boolean sslInvalidHostNameAllowed) { this.sslInvalidHostNameAllowed = sslInvalidHostNameAllowed; } - public String getTlsAllowInvalidHostnames() { + public boolean getTlsAllowInvalidHostnames() { return tlsAllowInvalidHostnames; } - public void setTlsAllowInvalidHostnames(String tlsAllowInvalidHostnames) { + public void setTlsAllowInvalidHostnames(boolean tlsAllowInvalidHostnames) { this.tlsAllowInvalidHostnames = tlsAllowInvalidHostnames; } @@ -205,6 +181,38 @@ public class SourceTaskConfig { this.compressors = compressors; } + public String getZlibCompressionLevel() { + return zlibCompressionLevel; + } + + public void setZlibCompressionLevel(String zlibCompressionLevel) { + this.zlibCompressionLevel = zlibCompressionLevel; + } + + public String getTrustStore() { + return trustStore; + } + + public void setTrustStore(String trustStore) { + this.trustStore = trustStore; + } + + public String getTrustStorePassword() { + return trustStorePassword; + } + + public void setTrustStorePassword(String trustStorePassword) { + this.trustStorePassword = trustStorePassword; + } + + public int getCopyThread() { + return copyThread; + } + + public void setCopyThread(int copyThread) { + this.copyThread = copyThread; + } + public void load(KeyValue props) { properties2Object(props, this); diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java index e3dfb6f..5be2e0d 100644 --- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.connector; import io.openmessaging.KeyValue; diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java index da244cd..49bcf49 100644 --- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.connector; import com.alibaba.fastjson.JSONObject; @@ -7,13 +24,10 @@ import io.openmessaging.connector.api.source.SourceTask; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Collection; -import java.util.regex.Pattern; -import org.apache.commons.lang3.StringUtils; import org.apache.connect.mongo.SourceTaskConfig; -import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.replicator.Position; import org.apache.connect.mongo.replicator.ReplicaSet; -import org.apache.connect.mongo.replicator.ReplicaSetConfig; -import org.apache.connect.mongo.replicator.ReplicaSets; +import org.apache.connect.mongo.replicator.ReplicaSetManager; import org.apache.connect.mongo.replicator.ReplicaSetsContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,12 +38,10 @@ public class MongoSourceTask extends SourceTask { private SourceTaskConfig sourceTaskConfig; - private ReplicaSets replicaSets; + private ReplicaSetManager replicaSetManager; private ReplicaSetsContext replicaSetsContext; - private Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$"); - @Override public Collection<SourceDataEntry> poll() { @@ -41,24 +53,23 @@ public class MongoSourceTask extends SourceTask { try { sourceTaskConfig = new SourceTaskConfig(); sourceTaskConfig.load(config); + replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig); - replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr()); - replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> { + + replicaSetManager = ReplicaSetManager.create(sourceTaskConfig.getMongoAddr()); + + replicaSetManager.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> { ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap( replicaSetName.getBytes())); if (byteBuffer != null && byteBuffer.array().length > 0) { String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8); - ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class); + Position position = JSONObject.parseObject(positionJson, Position.class); replicaSetConfig.setPosition(position); } else { - ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition(); - position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null - && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches() - ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0); - position.setInc(sourceTaskConfig.getPositionInc() != null - && pattern.matcher(sourceTaskConfig.getPositionInc()).matches() - ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0); - position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false); + Position position = new Position(); + position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp()); + position.setInc(sourceTaskConfig.getPositionInc()); + position.setInitSync(sourceTaskConfig.isDataSync()); replicaSetConfig.setPosition(position); } diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java index 87d92ea..1d6dfe5 100644 --- a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java +++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.connector.builder; import com.alibaba.fastjson.JSONObject; @@ -10,6 +27,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.replicator.Position; import org.apache.connect.mongo.replicator.ReplicaSetConfig; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; @@ -17,8 +35,8 @@ import org.bson.BsonTimestamp; import static org.apache.connect.mongo.replicator.Constants.CREATED; import static org.apache.connect.mongo.replicator.Constants.NAMESPACE; -import static org.apache.connect.mongo.replicator.Constants.OBJECTID; -import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE; +import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID; +import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE; import static org.apache.connect.mongo.replicator.Constants.PATCH; import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP; import static org.apache.connect.mongo.replicator.Constants.VERSION; @@ -48,12 +66,12 @@ public class MongoDataEntry { dataEntryBuilder.timestamp(System.currentTimeMillis()) .queue(event.getNamespace().replace(".", "-").replace("$", "-")) .entryType(event.getEntryType()); - dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name()); + dataEntryBuilder.putFiled(OPERATION_TYPE, event.getOperationType().name()); dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue()); dataEntryBuilder.putFiled(VERSION, event.getV()); dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace()); dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : ""); - dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : ""); + dataEntryBuilder.putFiled(OBJECT_ID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : ""); } String position = createPosition(event, replicaSetConfig); @@ -64,7 +82,7 @@ public class MongoDataEntry { } private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) { - ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition(); + Position position = new Position(); BsonTimestamp timestamp = event.getTimestamp(); position.setInc(timestamp != null ? timestamp.getInc() : 0); position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0); @@ -99,7 +117,7 @@ public class MongoDataEntry { private static void oplogField(Schema schema) { schema.setFields(new ArrayList<>()); - Field op = new Field(0, OPERATIONTYPE, FieldType.STRING); + Field op = new Field(0, OPERATION_TYPE, FieldType.STRING); schema.getFields().add(op); Field time = new Field(1, TIMESTAMP, FieldType.INT64); schema.getFields().add(time); @@ -109,7 +127,7 @@ public class MongoDataEntry { schema.getFields().add(namespace); Field patch = new Field(4, PATCH, FieldType.STRING); schema.getFields().add(patch); - Field objectId = new Field(5, OBJECTID, FieldType.STRING); + Field objectId = new Field(5, OBJECT_ID, FieldType.STRING); schema.getFields().add(objectId); } diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java index 9c73eac..4af5060 100644 --- a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java +++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.initsync; public class CollectionMeta { diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java index 6cdbe06..3d68fac 100644 --- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java +++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.initsync; import com.mongodb.client.MongoClient; @@ -13,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.connect.mongo.replicator.ReplicaSet; import org.apache.connect.mongo.replicator.ReplicaSetConfig; import org.apache.connect.mongo.replicator.ReplicaSetsContext; -import org.apache.connect.mongo.replicator.event.EventConverter; +import org.apache.connect.mongo.replicator.event.Document2EventConverter; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; import org.bson.Document; @@ -115,24 +132,24 @@ public class InitSync { .batchSize(200) .iterator(); while (replicaSet.isRuning() && mongoCursor.hasNext()) { - if (context.initSyncAbort()) { + if (context.isInitSyncAbort()) { logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count); return; } count++; Document document = mongoCursor.next(); - ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName()); + ReplicationEvent event = Document2EventConverter.convert(document, replicaSetConfig.getReplicaSetName()); event.setOperationType(OperationType.CREATED); event.setNamespace(collectionMeta.getNameSpace()); context.publishEvent(event, replicaSetConfig); } } catch (Exception e) { - context.initSyncError(); + context.setInitSyncError(); + replicaSet.shutdown(); logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace(), e); } finally { countDownLatch.countDown(); - replicaSet.shutdown(); } logger.info("database:{}, collection:{}, copy {} documents, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java index c895bd6..7ba1ac4 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java +++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator; public class Constants { @@ -5,17 +22,15 @@ public class Constants { public static final String MONGO_LOCAL_DATABASE = "local"; public static final String MONGO_OPLOG_RS = "oplog.rs"; - public static final String OPERATIONTYPE = "op"; + public static final String OPERATION_TYPE = "op"; public static final String TIMESTAMP = "ts"; public static final String VERSION = "v"; public static final String HASH = "h"; public static final String NAMESPACE = "ns"; public static final String OPERATION = "o"; - public static final String OBJECTID = "o2"; + public static final String OBJECT_ID = "o2"; public static final String CREATED = "created"; public static final String PATCH = "patch"; - public static final String INITSYNC = "initSync"; - } diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java index fd26163..a517822 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java +++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator; import com.alibaba.fastjson.JSONObject; @@ -49,7 +66,7 @@ public class Filter { return false; }; - notNoopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal(); + notNoopFilter = (operationType) -> !operationType.equals(OperationType.NOOP); } public boolean filterMeta(CollectionMeta collectionMeta) { diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java index f5e01a3..11bca8f 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java +++ b/src/main/java/org/apache/connect/mongo/replicator/MongoClientFactory.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator; import com.mongodb.ConnectionString; @@ -38,46 +55,46 @@ public class MongoClientFactory { sb.append(replicaSetConfig.getReplicaSetName()); } - if (StringUtils.isNotBlank(taskConfig.getServerSelectionTimeoutMS())) { + if (taskConfig.getServerSelectionTimeoutMS() > 0) { sb.append("&"); sb.append("serverSelectionTimeoutMS="); sb.append(taskConfig.getServerSelectionTimeoutMS()); } - if (StringUtils.isNotBlank(taskConfig.getConnectTimeoutMS())) { + if (taskConfig.getConnectTimeoutMS() > 0) { sb.append("&"); sb.append("connectTimeoutMS="); sb.append(taskConfig.getConnectTimeoutMS()); } - if (StringUtils.isNotBlank(taskConfig.getSocketTimeoutMS())) { + if (taskConfig.getSocketTimeoutMS() > 0) { sb.append("&"); sb.append("socketTimeoutMS="); sb.append(taskConfig.getSocketTimeoutMS()); } - if (StringUtils.isNotBlank(taskConfig.getSsl()) || StringUtils.isNotBlank(taskConfig.getTsl())) { + if (taskConfig.getSsl() || taskConfig.getTsl()) { sb.append("&"); sb.append("ssl="); sb.append(true); } - if (StringUtils.isNotBlank(taskConfig.getTlsInsecure())) { + if (taskConfig.getTlsInsecure()) { sb.append("&"); sb.append("tlsInsecure="); - sb.append(taskConfig.getTlsInsecure()); + sb.append(true); } - if (StringUtils.isNotBlank(taskConfig.getTlsAllowInvalidHostnames())) { + if (taskConfig.getTlsAllowInvalidHostnames()) { sb.append("&"); sb.append("tlsAllowInvalidHostnames="); - sb.append(taskConfig.getTlsAllowInvalidHostnames()); + sb.append(true); } - if (StringUtils.isNotBlank(taskConfig.getSslInvalidHostNameAllowed())) { + if (taskConfig.getSslInvalidHostNameAllowed()) { sb.append("&"); sb.append("sslInvalidHostNameAllowed="); - sb.append(taskConfig.getSslInvalidHostNameAllowed()); + sb.append(true); } if (StringUtils.isNotBlank(taskConfig.getCompressors())) { @@ -105,7 +122,6 @@ public class MongoClientFactory { } logger.info("connection string :{}", sb.toString()); - System.out.println(sb.toString()); ConnectionString connectionString = new ConnectionString(sb.toString()); return MongoClients.create(connectionString); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/Position.java b/src/main/java/org/apache/connect/mongo/replicator/Position.java new file mode 100644 index 0000000..29fd856 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/Position.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.connect.mongo.replicator; + +import java.util.Objects; +import org.bson.BsonTimestamp; + +public class Position { + + private int timeStamp; + private int inc; + private boolean initSync; + + public int getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(int timeStamp) { + this.timeStamp = timeStamp; + } + + public int getInc() { + return inc; + } + + public void setInc(int inc) { + this.inc = inc; + } + + public boolean isInitSync() { + return initSync; + } + + public void setInitSync(boolean initSync) { + this.initSync = initSync; + } + + public Position() { + + } + + public Position(int timeStamp, int inc, boolean initSync) { + this.timeStamp = timeStamp; + this.inc = inc; + this.initSync = initSync; + } + + public boolean isValid() { + return timeStamp > 0 && inc > 0; + } + + public BsonTimestamp converBsonTimeStamp() { + return new BsonTimestamp(timeStamp, inc); + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + Position position = (Position) o; + return timeStamp == position.timeStamp && + inc == position.inc && + initSync == position.initSync; + } + + @Override public int hashCode() { + return Objects.hash(timeStamp, inc, initSync); + } +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java index 8f4d0d8..8393316 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator; import com.mongodb.client.MongoClient; @@ -38,14 +55,14 @@ public class ReplicaSet { } public void start() { + if (!running.compareAndSet(false, true)) { + logger.info("the java mongo replica already start"); + return; + } try { - if (!running.compareAndSet(false, true)) { - logger.info("the java mongo replica already start"); - return; - } this.mongoClient = replicaSetsContext.createMongoClient(replicaSetConfig); - this.isReplicaMongo(); + this.checkReplicaMongo(); executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext)); } catch (Exception e) { logger.error("start replicator:{} error", replicaSetConfig, e); @@ -53,16 +70,15 @@ public class ReplicaSet { } } - public boolean isReplicaMongo() { + public void checkReplicaMongo() { MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE); MongoIterable<String> collectionNames = local.listCollectionNames(); MongoCursor<String> iterator = collectionNames.iterator(); while (iterator.hasNext()) { if (StringUtils.equals(MONGO_OPLOG_RS, iterator.next())) { - return true; + return; } } - this.shutdown(); throw new IllegalStateException(String.format("url:%s, is not replica", replicaSetConfig.getHost())); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java index 1b54b17..ced90b8 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java @@ -1,7 +1,21 @@ -package org.apache.connect.mongo.replicator; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ -import java.util.Objects; -import org.bson.BsonTimestamp; +package org.apache.connect.mongo.replicator; public class ReplicaSetConfig { @@ -48,10 +62,6 @@ public class ReplicaSetConfig { this.host = host; } - public Position emptyPosition() { - return new Position(0, 0, true); - } - @Override public String toString() { return "ReplicaSetConfig{" + @@ -62,73 +72,4 @@ public class ReplicaSetConfig { '}'; } - public class Position { - private int timeStamp; - private int inc; - private boolean initSync; - - public int getTimeStamp() { - return timeStamp; - } - - public void setTimeStamp(int timeStamp) { - this.timeStamp = timeStamp; - } - - public int getInc() { - return inc; - } - - public void setInc(int inc) { - this.inc = inc; - } - - public boolean isInitSync() { - return initSync; - } - - public void setInitSync(boolean initSync) { - this.initSync = initSync; - } - - public Position(int timeStamp, int inc, boolean initSync) { - this.timeStamp = timeStamp; - this.inc = inc; - this.initSync = initSync; - } - - public boolean isValid() { - return timeStamp > 0; - } - - public BsonTimestamp converBsonTimeStamp() { - return new BsonTimestamp(timeStamp, inc); - } - - @Override - public String toString() { - return "Position{" + - "timeStamp=" + timeStamp + - ", inc=" + inc + - ", initSync=" + initSync + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - Position position = (Position) o; - return timeStamp == position.timeStamp && - inc == position.inc && - initSync == position.initSync; - } - - @Override - public int hashCode() { - return Objects.hash(timeStamp, inc, initSync); - } - } } diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java similarity index 65% rename from src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java rename to src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java index af1ebeb..c5757d8 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetManager.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator; import java.util.HashMap; @@ -9,13 +26,15 @@ import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; -public class ReplicaSets { +public class ReplicaSetManager { private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)"); + private static final String HOST_SEPARATOR = ";"; + private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>(); - public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) { + public ReplicaSetManager(Set<ReplicaSetConfig> replicaSetConfigs) { replicaSetConfigs.forEach(replicaSetConfig -> { if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) { replicaConfigByName.put(replicaSetConfig.getReplicaSetName(), replicaSetConfig); @@ -25,10 +44,10 @@ public class ReplicaSets { validate(); } - public static ReplicaSets create(String hosts) { + public static ReplicaSetManager create(String hosts) { Set<ReplicaSetConfig> replicaSetConfigs = new HashSet<>(); if (hosts != null) { - for (String replicaSetStr : StringUtils.split(hosts.trim(), ";")) { + for (String replicaSetStr : StringUtils.split(hosts.trim(), HOST_SEPARATOR)) { if (StringUtils.isNotBlank(replicaSetStr)) { ReplicaSetConfig replicaSetConfig = parseReplicaSetStr(replicaSetStr); if (replicaSetConfig != null) { @@ -37,7 +56,7 @@ public class ReplicaSets { } } } - return new ReplicaSets(replicaSetConfigs); + return new ReplicaSetManager(replicaSetConfigs); } private static ReplicaSetConfig parseReplicaSetStr(String hosts) { diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java index e599f5b..b067256 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java @@ -1,12 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator; import com.mongodb.client.MongoClient; import io.openmessaging.connector.api.data.SourceDataEntry; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.connect.mongo.SourceTaskConfig; @@ -28,9 +46,12 @@ public class ReplicaSetsContext { private MongoClientFactory mongoClientFactory; + private Map<String, Position> lastPositionMap; + public ReplicaSetsContext(SourceTaskConfig taskConfig) { this.taskConfig = taskConfig; - this.replicaSets = new CopyOnWriteArrayList<>(); + this.replicaSets = new ArrayList<>(); + this.lastPositionMap = new HashMap<>(); this.dataEntryQueue = new LinkedBlockingDeque<>(); this.filter = new Filter(taskConfig); this.mongoClientFactory = new MongoClientFactory(taskConfig); @@ -91,11 +112,11 @@ public class ReplicaSetsContext { return res; } - public boolean initSyncAbort() { + public boolean isInitSyncAbort() { return initSyncAbort.get(); } - public void initSyncError() { + public void setInitSyncError() { initSyncAbort.set(true); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java index 6cb46d1..4c142ce 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator; import com.mongodb.CursorType; @@ -7,8 +24,9 @@ import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import org.apache.connect.mongo.initsync.InitSync; -import org.apache.connect.mongo.replicator.event.EventConverter; +import org.apache.connect.mongo.replicator.event.Document2EventConverter; import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.bson.BsonTimestamp; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,19 +54,27 @@ public class ReplicatorTask implements Runnable { @Override public void run() { - if (replicaSetConfig.getPosition() == null || replicaSetConfig.getPosition().isInitSync()) { + BsonTimestamp firstAvailablePosition = findOplogFirstPosition(); + + // inValid or + // user config dataSync or + // user config or runtime saved position lt first oplog position maybe some operation is lost so need dataSync + if (!replicaSetConfig.getPosition().isValid() || replicaSetConfig.getPosition().isInitSync() + || replicaSetConfig.getPosition().converBsonTimeStamp().compareTo(firstAvailablePosition) < 0) { + recordOplogLastPosition(); InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet); initSync.start(); + } - MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE); - FindIterable<Document> iterable; - if (replicaSetConfig.getPosition().isValid()) { - iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find( - Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp())); - } else { - iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); + if (!replicaSet.isRuning() || !replicaSetsContext.isInitSyncAbort()) { + return; } + + MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE); + FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find( + Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp())); + MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1)) .noCursorTimeout(true) .cursorType(CursorType.TailableAwait) @@ -70,10 +96,26 @@ public class ReplicatorTask implements Runnable { logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig); } + private BsonTimestamp findOplogFirstPosition() { + MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE); + FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); + Document lastOplog = iterable.sort(new Document("$natural", 1)).limit(1).first(); + BsonTimestamp timestamp = lastOplog.get(Constants.TIMESTAMP, BsonTimestamp.class); + return timestamp; + } + + private void recordOplogLastPosition() { + MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE); + FindIterable<Document> iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); + Document lastOplog = iterable.sort(new Document("$natural", -1)).limit(1).first(); + BsonTimestamp timestamp = lastOplog.get(Constants.TIMESTAMP, BsonTimestamp.class); + replicaSetConfig.setPosition(new Position(timestamp.getTime(), timestamp.getInc(), false)); + } + private void executorCursor(MongoCursor<Document> cursor) { while (cursor.hasNext() && !replicaSet.isPause()) { Document document = cursor.next(); - ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName()); + ReplicationEvent event = Document2EventConverter.convert(document, replicaSetConfig.getReplicaSetName()); if (replicaSetsContext.filterEvent(event)) { replicaSetsContext.publishEvent(event, replicaSetConfig); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java similarity index 58% rename from src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java rename to src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java index 1b48990..99ab707 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java +++ b/src/main/java/org/apache/connect/mongo/replicator/event/Document2EventConverter.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator.event; import java.util.Optional; @@ -6,24 +23,24 @@ import org.bson.Document; import static org.apache.connect.mongo.replicator.Constants.HASH; import static org.apache.connect.mongo.replicator.Constants.NAMESPACE; -import static org.apache.connect.mongo.replicator.Constants.OBJECTID; +import static org.apache.connect.mongo.replicator.Constants.OBJECT_ID; import static org.apache.connect.mongo.replicator.Constants.OPERATION; -import static org.apache.connect.mongo.replicator.Constants.OPERATIONTYPE; +import static org.apache.connect.mongo.replicator.Constants.OPERATION_TYPE; import static org.apache.connect.mongo.replicator.Constants.TIMESTAMP; import static org.apache.connect.mongo.replicator.Constants.VERSION; -public class EventConverter { +public class Document2EventConverter { public static ReplicationEvent convert(Document document, String replicaSetName) { ReplicationEvent event = new ReplicationEvent(); - event.setOperationType(OperationType.getOperationType(document.getString(OPERATIONTYPE))); + event.setOperationType(OperationType.getOperationType(document.getString(OPERATION_TYPE))); event.setTimestamp(document.get(TIMESTAMP, BsonTimestamp.class)); event.setH(document.getLong(HASH)); event.setV(document.getInteger(VERSION)); event.setNamespace(document.getString(NAMESPACE)); event.setEventData(Optional.ofNullable(document.get(OPERATION, Document.class))); - event.setObjectId(Optional.ofNullable(document.get(OBJECTID, Document.class))); + event.setObjectId(Optional.ofNullable(document.get(OBJECT_ID, Document.class))); event.setReplicaSetName(replicaSetName); event.setDocument(document); return event; diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java index 54df394..b418666 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java +++ b/src/main/java/org/apache/connect/mongo/replicator/event/OperationType.java @@ -1,28 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator.event; +import org.apache.commons.lang3.StringUtils; + public enum OperationType { INSERT("i"), UPDATE("u"), DELETE("d"), NOOP("n"), - DBCOMMAND("c"), + DB_COMMAND("c"), CREATED("created"), UNKNOWN("unknown"); - private final String operationStr; + private final String operation; - OperationType(String operationStr) { - this.operationStr = operationStr; + OperationType(String operation) { + this.operation = operation; } - public static OperationType getOperationType(String operationStr) { - for (OperationType operationType : OperationType.values()) { - if (operationType.operationStr.equals(operationStr)) { - return operationType; - } + public static OperationType getOperationType(String operation) { + + if (StringUtils.isEmpty(operation)) { + return UNKNOWN; + } + + switch (operation) { + case "i": + return INSERT; + case "u": + return UPDATE; + case "d": + return DELETE; + case "n": + return NOOP; + case "c": + return DB_COMMAND; + case "created": + return CREATED; + default: + return UNKNOWN; } - return UNKNOWN; } } diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java index 6407781..7adca71 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java +++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.connect.mongo.replicator.event; import io.openmessaging.connector.api.data.EntryType; diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java index a804b8b..31c8f4d 100644 --- a/src/test/java/org/apache/connect/mongo/FilterTest.java +++ b/src/test/java/org/apache/connect/mongo/FilterTest.java @@ -59,8 +59,9 @@ public class FilterTest { ReplicationEvent replicationEvent = new ReplicationEvent(); replicationEvent.setOperationType(OperationType.NOOP); Assert.assertFalse(filter.filterEvent(replicationEvent)); - replicationEvent.setOperationType(OperationType.DBCOMMAND); + replicationEvent.setOperationType(OperationType.DB_COMMAND); Assert.assertTrue(filter.filterEvent(replicationEvent)); + } } diff --git a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java index 93adeeb..0f02064 100644 --- a/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoFactoryTest.java @@ -37,7 +37,7 @@ public class MongoFactoryTest { @Test public void testCreateMongoClientWithSSL() { - sourceTaskConfig.setSsl("ssl"); + sourceTaskConfig.setSsl(true); MongoClientSettings settings = getSettings(); System.out.println(settings.getSslSettings()); Assert.assertTrue(settings.getSslSettings().isEnabled()); @@ -45,7 +45,7 @@ public class MongoFactoryTest { @Test public void testCreateMongoClientWithTSL() { - sourceTaskConfig.setTsl("tsl"); + sourceTaskConfig.setTsl(true); MongoClientSettings settings = getSettings(); System.out.println(settings.getSslSettings()); Assert.assertTrue(settings.getSslSettings().isEnabled()); @@ -55,7 +55,7 @@ public class MongoFactoryTest { public void testCreateMongoClientWithserverSelectionTimeoutMS() { try { replicaSetConfig.setReplicaSetName("testReplicatSet"); - sourceTaskConfig.setServerSelectionTimeoutMS("150"); + sourceTaskConfig.setServerSelectionTimeoutMS(150); System.out.println(getSettings().getClusterSettings()); Assert.assertTrue(getSettings().getClusterSettings().getServerSelectionTimeout(TimeUnit.MILLISECONDS) == 150); } catch (MongoTimeoutException exception) { @@ -65,7 +65,7 @@ public class MongoFactoryTest { @Test public void testCreateMongoClientWithConnectTimeoutMS() { - sourceTaskConfig.setConnectTimeoutMS("1200"); + sourceTaskConfig.setConnectTimeoutMS(1200); System.out.println(getSettings().getSocketSettings()); Assert.assertTrue(getSettings().getSocketSettings().getConnectTimeout(TimeUnit.MILLISECONDS) == 1200); @@ -73,40 +73,40 @@ public class MongoFactoryTest { @Test public void testCreateMongoClientWithSocketTimeoutMS() { - sourceTaskConfig.setSocketTimeoutMS("1100"); + sourceTaskConfig.setSocketTimeoutMS(1100); System.out.println(getSettings().getSocketSettings()); Assert.assertTrue(getSettings().getSocketSettings().getReadTimeout(TimeUnit.MILLISECONDS) == 1100); } @Test public void testCreateMongoClientWithInvalidHostNameAllowed() { - sourceTaskConfig.setSslInvalidHostNameAllowed("true"); + sourceTaskConfig.setSslInvalidHostNameAllowed(true); System.out.println(getSettings().getSslSettings()); Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed()); - sourceTaskConfig.setSslInvalidHostNameAllowed("false"); + sourceTaskConfig.setSslInvalidHostNameAllowed(false); System.out.println(getSettings().getSslSettings()); Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed()); } @Test public void testCreateMongoClientWithInvalidHostNameAllowedTsl() { - sourceTaskConfig.setTlsAllowInvalidHostnames("true"); + sourceTaskConfig.setTlsAllowInvalidHostnames(true); System.out.println(getSettings().getSslSettings()); Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed()); - sourceTaskConfig.setTlsAllowInvalidHostnames("false"); + sourceTaskConfig.setTlsAllowInvalidHostnames(false); System.out.println(getSettings().getSslSettings()); Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed()); } @Test public void testCreateMongoClientWithTlsinsecure() { - sourceTaskConfig.setTlsInsecure("true"); + sourceTaskConfig.setTlsInsecure(true); System.out.println(getSettings().getSslSettings()); Assert.assertTrue(getSettings().getSslSettings().isInvalidHostNameAllowed()); - sourceTaskConfig.setTlsInsecure("false"); + sourceTaskConfig.setTlsInsecure(false); System.out.println(getSettings().getSslSettings()); Assert.assertFalse(getSettings().getSslSettings().isInvalidHostNameAllowed()); } diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java index f85e4a9..cc02fbc 100644 --- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java @@ -12,6 +12,7 @@ import java.util.Optional; import java.util.concurrent.LinkedBlockingQueue; import org.apache.connect.mongo.connector.MongoSourceConnector; import org.apache.connect.mongo.connector.MongoSourceTask; +import org.apache.connect.mongo.replicator.Position; import org.apache.connect.mongo.replicator.ReplicaSetConfig; import org.apache.connect.mongo.replicator.ReplicaSetsContext; import org.apache.connect.mongo.replicator.event.OperationType; @@ -73,7 +74,7 @@ public class MongoSourceConnectorTest { Assert.assertEquals("testReplicaName", new String(sourcePartition.array())); ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition(); - ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class); + Position position = JSONObject.parseObject(new String(sourcePosition.array()), Position.class); Assert.assertEquals(position.getTimeStamp(), 1565609506); Assert.assertEquals(position.getInc(), 1); Assert.assertEquals(position.isInitSync(), false); diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java index b696393..4983a66 100644 --- a/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoSourceTaskTest.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.connect.mongo.connector.MongoSourceTask; -import org.apache.connect.mongo.replicator.Constants; import org.apache.connect.mongo.replicator.ReplicaSet; import org.apache.connect.mongo.replicator.ReplicaSetConfig; import org.apache.connect.mongo.replicator.ReplicaSetsContext; @@ -31,7 +30,7 @@ public class MongoSourceTaskTest { defaultKeyValue.put("positionTimeStamp", "11111111"); defaultKeyValue.put("positionInc", "111"); defaultKeyValue.put("serverSelectionTimeoutMS", "10"); - defaultKeyValue.put("dataSync", Constants.INITSYNC); + defaultKeyValue.put("dataSync", "true"); Field context = SourceTask.class.getDeclaredField("context"); context.setAccessible(true); diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java index 98e9a42..3d900fa 100644 --- a/src/test/java/org/apache/connect/mongo/MongoTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoTest.java @@ -19,10 +19,11 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.connect.mongo.initsync.InitSync; import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.replicator.Position; import org.apache.connect.mongo.replicator.ReplicaSet; import org.apache.connect.mongo.replicator.ReplicaSetConfig; import org.apache.connect.mongo.replicator.ReplicaSetsContext; -import org.apache.connect.mongo.replicator.event.EventConverter; +import org.apache.connect.mongo.replicator.event.Document2EventConverter; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; import org.bson.BsonTimestamp; @@ -49,11 +50,11 @@ public class MongoTest { oplog.put(Constants.TIMESTAMP, timestamp); oplog.put(Constants.NAMESPACE, "test.person"); oplog.put(Constants.HASH, 11111L); - oplog.put(Constants.OPERATIONTYPE, "i"); + oplog.put(Constants.OPERATION_TYPE, "i"); Document document = new Document(); document.put("test", "test"); oplog.put(Constants.OPERATION, document); - ReplicationEvent event = EventConverter.convert(oplog, "testR"); + ReplicationEvent event = Document2EventConverter.convert(oplog, "testR"); Assert.assertEquals(timestamp, event.getTimestamp()); Assert.assertEquals("test.person", event.getNamespace()); Assert.assertTrue(11111L == event.getH()); @@ -95,16 +96,16 @@ public class MongoTest { int syncCount = 0; while (syncCount < count) { Collection<SourceDataEntry> sourceDataEntries = replicaSetsContext.poll(); - Assert.assertNotNull(sourceDataEntries); + Assert.assertTrue(sourceDataEntries.size() > 0); for (SourceDataEntry sourceDataEntry : sourceDataEntries) { ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition(); Assert.assertEquals("test", new String(sourcePartition.array())); ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition(); - ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition(); + Position position = new Position(); position.setInitSync(true); position.setTimeStamp(0); position.setInc(0); - Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class)); + Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), Position.class)); EntryType entryType = sourceDataEntry.getEntryType(); Assert.assertEquals(EntryType.CREATE, entryType); String queueName = sourceDataEntry.getQueueName(); @@ -122,4 +123,16 @@ public class MongoTest { Assert.assertTrue(syncCount == count); } + + @Test + public void testCompareBsonTimestamp() { + BsonTimestamp lt = new BsonTimestamp(11111111, 1); + BsonTimestamp gt = new BsonTimestamp(11111111, 2); + Assert.assertTrue(lt.compareTo(gt) < 0); + + lt = new BsonTimestamp(11111111, 1); + gt = new BsonTimestamp(22222222, 1); + Assert.assertTrue(lt.compareTo(gt) < 0); + + } } diff --git a/src/test/java/org/apache/connect/mongo/OperationTypeTest.java b/src/test/java/org/apache/connect/mongo/OperationTypeTest.java new file mode 100644 index 0000000..d8c5a9b --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/OperationTypeTest.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.connect.mongo; + +import org.apache.connect.mongo.replicator.event.OperationType; +import org.junit.Assert; +import org.junit.Test; + +public class OperationTypeTest { + + @Test + public void testGetOperationType() { + Assert.assertEquals(OperationType.INSERT, OperationType.getOperationType("i")); + Assert.assertEquals(OperationType.UPDATE, OperationType.getOperationType("u")); + Assert.assertEquals(OperationType.DELETE, OperationType.getOperationType("d")); + Assert.assertEquals(OperationType.NOOP, OperationType.getOperationType("n")); + Assert.assertEquals(OperationType.DB_COMMAND, OperationType.getOperationType("c")); + Assert.assertEquals(OperationType.CREATED, OperationType.getOperationType("created")); + Assert.assertEquals(OperationType.UNKNOWN, OperationType.getOperationType("test")); + + } +} diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java similarity index 75% rename from src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java rename to src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java index 5276f4f..1d3b743 100644 --- a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java +++ b/src/test/java/org/apache/connect/mongo/ReplicaSetManagerTest.java @@ -2,26 +2,26 @@ package org.apache.connect.mongo; import java.util.Map; import org.apache.connect.mongo.replicator.ReplicaSetConfig; -import org.apache.connect.mongo.replicator.ReplicaSets; +import org.apache.connect.mongo.replicator.ReplicaSetManager; import org.junit.Assert; import org.junit.Test; -public class ReplicaSetsTest { +public class ReplicaSetManagerTest { @Test(expected = IllegalArgumentException.class) public void testCreatReplicaSetsExceptionWithoutMongoAddr() { - ReplicaSets.create(""); + ReplicaSetManager.create(""); } @Test(expected = IllegalArgumentException.class) public void testCreatReplicaSetsExceptioWithoutReplicaSetName() { - ReplicaSets.create("127.0.0.1:27081"); + ReplicaSetManager.create("127.0.0.1:27081"); } @Test public void testCreatReplicaSetsSpecialReplicaSetName() { - ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083"); - Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); + ReplicaSetManager replicaSetManager = ReplicaSetManager.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083"); + Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName(); Assert.assertTrue(replicaSetConfigMap.size() == 1); Assert.assertNotNull(replicaSetConfigMap.get("replicaName1")); Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost()); @@ -30,8 +30,8 @@ public class ReplicaSetsTest { @Test public void testCreatReplicaSetsSpecialShardNameAndReplicaSetName() { - ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083"); - Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); + ReplicaSetManager replicaSetManager = ReplicaSetManager.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083"); + Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName(); Assert.assertTrue(replicaSetConfigMap.size() == 1); Assert.assertNotNull(replicaSetConfigMap.get("replicaName1")); Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost()); @@ -41,8 +41,8 @@ public class ReplicaSetsTest { @Test public void testCreatReplicaSetsMutiMongoAddr() { - ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283"); - Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); + ReplicaSetManager replicaSetManager = ReplicaSetManager.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283"); + Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSetManager.getReplicaConfigByName(); Assert.assertTrue(replicaSetConfigMap.size() == 2); Assert.assertNotNull(replicaSetConfigMap.get("replicaName1")); Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost());
