This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit f95bf2257d3bb9c57cfb2194876e57af45c5023e Author: 李平 <[email protected]> AuthorDate: Tue Aug 13 15:59:38 2019 +0800 support multiple mongo replicaset --- ...ReplicatorConfig.java => SourceTaskConfig.java} | 35 ++--- .../mongo/connector/MongoSourceConnector.java | 4 +- .../connect/mongo/connector/MongoSourceTask.java | 146 +++++++------------- .../mongo/connector/builder/MongoDataEntry.java | 113 +++++++++++++++ .../apache/connect/mongo/initsync/InitSync.java | 61 ++++---- .../apache/connect/mongo/replicator/Constants.java | 6 - .../apache/connect/mongo/replicator/Filter.java | 8 +- .../connect/mongo/replicator/MongoReplicator.java | 153 --------------------- .../connect/mongo/replicator/ReplicaSet.java | 101 ++++++++++++++ .../connect/mongo/replicator/ReplicaSetConfig.java | 139 +++++++++++++++++++ .../connect/mongo/replicator/ReplicaSets.java | 70 ++++++++++ .../mongo/replicator/ReplicaSetsContext.java | 129 +++++++++++++++++ .../connect/mongo/replicator/ReplicatorTask.java | 45 +++--- .../replicator/event/DocumentConvertEvent.java | 29 ---- .../mongo/replicator/event/EventConverter.java | 29 ++++ .../mongo/replicator/event/ReplicationEvent.java | 18 ++- .../java/org/apache/connect/mongo/FilterTest.java | 28 ++-- .../connect/mongo/MongoSourceConnectorTest.java | 69 +++++++++- .../java/org/apache/connect/mongo/MongoTest.java | 78 +++++++---- .../apache/connect/mongo/ReplicaContextTest.java | 34 +++++ .../org/apache/connect/mongo/ReplicaSetsTest.java | 65 +++++++++ 21 files changed, 946 insertions(+), 414 deletions(-) diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java similarity index 83% rename from src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java rename to src/main/java/org/apache/connect/mongo/SourceTaskConfig.java index 9097640..b79b2e9 100644 --- a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java +++ b/src/main/java/org/apache/connect/mongo/SourceTaskConfig.java @@ -1,37 +1,37 @@ package org.apache.connect.mongo; import io.openmessaging.KeyValue; -import org.apache.commons.lang3.StringUtils; import org.bson.BsonTimestamp; import java.lang.reflect.Method; +import java.util.Collections; import java.util.HashSet; import java.util.Set; -public class MongoReplicatorConfig { +public class SourceTaskConfig { private String replicaSet; private String mongoAddr; private String mongoUserName; private String mongoPassWord; private String interestDbAndCollection; - private int positionTimeStamp; - private int positionInc; - private boolean dataSync; + private String positionTimeStamp; + private String positionInc; + private String dataSync; private int copyThread = Runtime.getRuntime().availableProcessors(); - public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + public static final Set<String> REQUEST_CONFIG = Collections.unmodifiableSet(new HashSet<String>() { { add("mongoAddr"); } - }; + }); - public int getPositionInc() { + public String getPositionInc() { return positionInc; } - public void setPositionInc(int positionInc) { + public void setPositionInc(String positionInc) { this.positionInc = positionInc; } @@ -43,11 +43,11 @@ public class MongoReplicatorConfig { this.copyThread = copyThread; } - public int getPositionTimeStamp() { + public String getPositionTimeStamp() { return positionTimeStamp; } - public void setPositionTimeStamp(int positionTimeStamp) { + public void setPositionTimeStamp(String positionTimeStamp) { this.positionTimeStamp = positionTimeStamp; } @@ -85,11 +85,11 @@ public class MongoReplicatorConfig { } - public boolean getDataSync() { + public String getDataSync() { return dataSync; } - public void setDataSync(boolean dataSync) { + public void setDataSync(String dataSync) { this.dataSync = dataSync; } @@ -148,16 +148,9 @@ public class MongoReplicatorConfig { } } - public String getDataSouce() { - if (StringUtils.isBlank(replicaSet)) { - return mongoAddr; - } - return replicaSet + ":" + mongoAddr; - } - public BsonTimestamp getPosition() { - return new BsonTimestamp(positionTimeStamp, positionInc); + return new BsonTimestamp(Integer.valueOf(positionTimeStamp), Integer.valueOf(positionInc)); } } diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java index 9e659c9..2b28ea2 100644 --- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java @@ -3,7 +3,7 @@ package org.apache.connect.mongo.connector; import io.openmessaging.KeyValue; import io.openmessaging.connector.api.Task; import io.openmessaging.connector.api.source.SourceConnector; -import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.SourceTaskConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,7 +18,7 @@ public class MongoSourceConnector extends SourceConnector { @Override public String verifyAndSetConfig(KeyValue config) { - for (String requestKey : MongoReplicatorConfig.REQUEST_CONFIG) { + for (String requestKey : SourceTaskConfig.REQUEST_CONFIG) { if (!config.containsKey(requestKey)) { return "Request config key: " + requestKey; } diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java index 7272878..407608a 100644 --- a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java @@ -2,91 +2,71 @@ package org.apache.connect.mongo.connector; import com.alibaba.fastjson.JSONObject; import io.openmessaging.KeyValue; -import io.openmessaging.connector.api.data.*; +import io.openmessaging.connector.api.data.SourceDataEntry; import io.openmessaging.connector.api.source.SourceTask; -import org.apache.connect.mongo.MongoReplicatorConfig; -import org.apache.connect.mongo.replicator.Constants; -import org.apache.connect.mongo.replicator.MongoReplicator; -import org.apache.connect.mongo.replicator.event.OperationType; -import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.SourceTaskConfig; +import org.apache.connect.mongo.replicator.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.Collection; -import java.util.List; +import java.util.regex.Pattern; public class MongoSourceTask extends SourceTask { private Logger logger = LoggerFactory.getLogger(this.getClass()); - private MongoReplicator mongoReplicator; + private SourceTaskConfig sourceTaskConfig; - private MongoReplicatorConfig replicatorConfig; + private ReplicaSets replicaSets; - private String mongoSource; + private ReplicaSetsContext replicaSetsContext; + + private Pattern pattern = Pattern.compile("^[-\\+]?[\\d]*$"); @Override public Collection<SourceDataEntry> poll() { - List<SourceDataEntry> res = new ArrayList<>(); - ReplicationEvent event = mongoReplicator.getQueue().poll(); - if (event == null) { - return new ArrayList<>(); - } - JSONObject position = position(event); - Schema schema = new Schema(); - schema.setDataSource(event.getDatabaseName()); - schema.setName(event.getCollectionName()); - schema.setFields(new ArrayList<>()); - buildFieleds(schema); - DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); - dataEntryBuilder.timestamp(System.currentTimeMillis()) - .queue(event.getNamespace().replace(".", "-").replace("$", "")) - .entryType(event.getEntryType()); - - if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) { - dataEntryBuilder.putFiled(Constants.CREATED, event.getDocument().toJson()); - dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace()); - } else { - dataEntryBuilder.putFiled(Constants.OPERATIONTYPE, event.getOperationType().name()); - dataEntryBuilder.putFiled(Constants.TIMESTAMP, event.getTimestamp().getValue()); - dataEntryBuilder.putFiled(Constants.VERSION, event.getV()); - dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace()); - dataEntryBuilder.putFiled(Constants.PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : ""); - dataEntryBuilder.putFiled(Constants.OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : ""); - } - SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( - ByteBuffer.wrap(mongoSource.getBytes(StandardCharsets.UTF_8)), - ByteBuffer.wrap(position.toJSONString().getBytes(StandardCharsets.UTF_8))); - res.add(sourceDataEntry); - return res; + return replicaSetsContext.poll(); } @Override public void start(KeyValue config) { try { - replicatorConfig = new MongoReplicatorConfig(); - replicatorConfig.load(config); - mongoReplicator = new MongoReplicator(replicatorConfig); - mongoSource = replicatorConfig.getDataSouce(); - ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap( - mongoSource.getBytes())); - - if (position != null && position.array().length > 0) { - String positionJson = new String(position.array(), StandardCharsets.UTF_8); - JSONObject jsonObject = JSONObject.parseObject(positionJson); - replicatorConfig.setPositionTimeStamp(jsonObject.getIntValue("timeStamp")); - replicatorConfig.setPositionInc(jsonObject.getIntValue("inc")); - replicatorConfig.setDataSync(jsonObject.getBooleanValue(Constants.INITSYNC)); - } else { - replicatorConfig.setDataSync(true); - } - mongoReplicator.start(); - }catch (Throwable throwable) { - logger.info("task start error", throwable); + sourceTaskConfig = new SourceTaskConfig(); + sourceTaskConfig.load(config); + replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig); + replicaSets = ReplicaSets.create(sourceTaskConfig.getMongoAddr()); + replicaSets.getReplicaConfigByName().forEach((replicaSetName, replicaSetConfig) -> { + ByteBuffer byteBuffer = this.context.positionStorageReader().getPosition(ByteBuffer.wrap( + replicaSetName.getBytes())); + if (byteBuffer != null && byteBuffer.array().length > 0) { + String positionJson = new String(byteBuffer.array(), StandardCharsets.UTF_8); + ReplicaSetConfig.Position position = JSONObject.parseObject(positionJson, ReplicaSetConfig.Position.class); + replicaSetConfig.setPosition(position); + } else { + ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition(); + position.setTimeStamp(sourceTaskConfig.getPositionTimeStamp() != null + && pattern.matcher(sourceTaskConfig.getPositionTimeStamp()).matches() + ? Integer.valueOf(sourceTaskConfig.getPositionTimeStamp()) : 0); + position.setInc(sourceTaskConfig.getPositionInc() != null + && pattern.matcher(sourceTaskConfig.getPositionInc()).matches() + ? Integer.valueOf(sourceTaskConfig.getPositionInc()) : 0); + position.setInitSync(StringUtils.equals(sourceTaskConfig.getDataSync(), Constants.INITSYNC) ? true : false); + replicaSetConfig.setPosition(position); + } + + ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext); + replicaSetsContext.addReplicaSet(replicaSet); + replicaSet.start(); + }); + + + } catch (Throwable throwable) { + logger.error("task start error", throwable); stop(); } } @@ -94,51 +74,19 @@ public class MongoSourceTask extends SourceTask { @Override public void stop() { logger.info("shut down....."); - mongoReplicator.shutdown(); + replicaSetsContext.shutdown(); } @Override public void pause() { - mongoReplicator.pause(); + logger.info("pause replica task..."); + replicaSetsContext.pause(); } @Override public void resume() { - mongoReplicator.resume(); - } - - private void buildFieleds(Schema schema) { - Field op = new Field(0, Constants.OPERATIONTYPE, FieldType.STRING); - schema.getFields().add(op); - Field time = new Field(1, Constants.TIMESTAMP, FieldType.INT64); - schema.getFields().add(time); - Field v = new Field(2, Constants.VERSION, FieldType.INT32); - schema.getFields().add(v); - Field namespace = new Field(3, Constants.NAMESPACE, FieldType.STRING); - schema.getFields().add(namespace); - Field operation = new Field(4, Constants.CREATED, FieldType.STRING); - schema.getFields().add(operation); - Field patch = new Field(5, Constants.PATCH, FieldType.STRING); - schema.getFields().add(patch); - Field objectId = new Field(6, Constants.OBJECTID, FieldType.STRING); - schema.getFields().add(objectId); + logger.info("resume replica task..."); + replicaSetsContext.resume(); } - private JSONObject position(ReplicationEvent event) { - JSONObject jsonObject = new JSONObject(); - switch (event.getOperationType()) { - case CREATED: - jsonObject.put(Constants.POSITION_TIMESTAMP, 0); - jsonObject.put(Constants.POSITION_INC, 0); - jsonObject.put(Constants.INITSYNC, true); - break; - default: - jsonObject.put(Constants.POSITION_TIMESTAMP, event.getTimestamp().getTime()); - jsonObject.put(Constants.POSITION_INC, event.getTimestamp().getInc()); - jsonObject.put(Constants.INITSYNC, false); - break; - } - return jsonObject; - - } } diff --git a/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java new file mode 100644 index 0000000..7c0db7d --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/connector/builder/MongoDataEntry.java @@ -0,0 +1,113 @@ +package org.apache.connect.mongo.connector.builder; + + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.connector.api.data.*; +import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.event.OperationType; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.bson.BsonTimestamp; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; + +import static org.apache.connect.mongo.replicator.Constants.*; + +public class MongoDataEntry { + + private static String SCHEMA_CREATED_NAME = "mongo_created"; + private static String SCHEMA_OPLOG_NAME = "mongo_oplog"; + + public static SourceDataEntry createSouceDataEntry(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) { + + DataEntryBuilder dataEntryBuilder; + + if (event.getOperationType().equals(OperationType.CREATED)) { + Schema schema = createdSchema(replicaSetConfig.getReplicaSetName()); + dataEntryBuilder = new DataEntryBuilder(schema); + dataEntryBuilder.timestamp(System.currentTimeMillis()) + .queue(event.getNamespace().replace(".", "-").replace("$", "-")) + .entryType(event.getEntryType()); + + dataEntryBuilder.putFiled(CREATED, event.getDocument().toJson()); + dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace()); + + } else { + Schema schema = oplogSchema(replicaSetConfig.getReplicaSetName()); + dataEntryBuilder = new DataEntryBuilder(schema); + dataEntryBuilder.timestamp(System.currentTimeMillis()) + .queue(event.getNamespace().replace(".", "-").replace("$", "-")) + .entryType(event.getEntryType()); + dataEntryBuilder.putFiled(OPERATIONTYPE, event.getOperationType().name()); + dataEntryBuilder.putFiled(TIMESTAMP, event.getTimestamp().getValue()); + dataEntryBuilder.putFiled(VERSION, event.getV()); + dataEntryBuilder.putFiled(NAMESPACE, event.getNamespace()); + dataEntryBuilder.putFiled(PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : ""); + dataEntryBuilder.putFiled(OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : ""); + } + + + String position = createPosition(event, replicaSetConfig); + SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( + ByteBuffer.wrap(replicaSetConfig.getReplicaSetName().getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap(position.getBytes(StandardCharsets.UTF_8))); + return sourceDataEntry; + } + + + private static String createPosition(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) { + ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition(); + BsonTimestamp timestamp = event.getTimestamp(); + position.setInc(timestamp != null ? timestamp.getInc() : 0); + position.setTimeStamp(timestamp != null ? timestamp.getTime() : 0); + position.setInitSync(event.getOperationType().equals(OperationType.CREATED) ? true : false); + return JSONObject.toJSONString(position); + + } + + private static Schema createdSchema(String dataSourceName) { + Schema schema = new Schema(); + schema.setDataSource(dataSourceName); + schema.setName(SCHEMA_CREATED_NAME); + schema.setFields(new ArrayList<>()); + createdField(schema); + return schema; + } + + + private static Schema oplogSchema(String dataSourceName) { + Schema schema = new Schema(); + schema.setDataSource(dataSourceName); + schema.setName(SCHEMA_OPLOG_NAME); + oplogField(schema); + return schema; + } + + + private static void createdField(Schema schema) { + Field namespace = new Field(0, NAMESPACE, FieldType.STRING); + schema.getFields().add(namespace); + Field operation = new Field(1, Constants.CREATED, FieldType.STRING); + schema.getFields().add(operation); + } + + private static void oplogField(Schema schema) { + schema.setFields(new ArrayList<>()); + Field op = new Field(0, OPERATIONTYPE, FieldType.STRING); + schema.getFields().add(op); + Field time = new Field(1, TIMESTAMP, FieldType.INT64); + schema.getFields().add(time); + Field v = new Field(2, VERSION, FieldType.INT32); + schema.getFields().add(v); + Field namespace = new Field(3, NAMESPACE, FieldType.STRING); + schema.getFields().add(namespace); + Field patch = new Field(4, PATCH, FieldType.STRING); + schema.getFields().add(patch); + Field objectId = new Field(5, OBJECTID, FieldType.STRING); + schema.getFields().add(objectId); + } + + +} diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java index e12412b..d92e968 100644 --- a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java +++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java @@ -3,45 +3,48 @@ package org.apache.connect.mongo.initsync; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoIterable; -import org.apache.connect.mongo.replicator.event.DocumentConvertEvent; +import org.apache.connect.mongo.replicator.ReplicaSet; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.ReplicaSetsContext; +import org.apache.connect.mongo.replicator.event.EventConverter; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; -import org.apache.connect.mongo.replicator.Filter; -import org.apache.connect.mongo.replicator.MongoReplicator; -import org.apache.connect.mongo.MongoReplicatorConfig; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class InitSync { private Logger logger = LoggerFactory.getLogger(this.getClass()); - private MongoReplicatorConfig mongoReplicatorConfig; + private ReplicaSetConfig replicaSetConfig; private ExecutorService copyExecutor; private MongoClient mongoClient; - private Filter filter; + private ReplicaSetsContext context; private int copyThreadCount; private Set<CollectionMeta> interestCollections; private CountDownLatch countDownLatch; - private MongoReplicator mongoReplicator; + private ReplicaSet replicaSet; - public InitSync(MongoReplicatorConfig mongoReplicatorConfig, MongoClient mongoClient, Filter filter, MongoReplicator mongoReplicator) { - this.mongoReplicatorConfig = mongoReplicatorConfig; + public InitSync(ReplicaSetConfig replicaSetConfig, MongoClient mongoClient, ReplicaSetsContext context, ReplicaSet replicaSet) { + this.replicaSetConfig = replicaSetConfig; this.mongoClient = mongoClient; - this.filter = filter; - this.mongoReplicator = mongoReplicator; + this.context = context; + this.replicaSet = replicaSet; init(); } public void start() { for (CollectionMeta collectionMeta : interestCollections) { - copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, mongoReplicator)); + copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, replicaSet)); } try { countDownLatch.await(); @@ -53,7 +56,7 @@ public class InitSync { private void init() { interestCollections = getInterestCollection(); - copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread()); + copyThreadCount = Math.min(interestCollections.size(), context.getCopyThread()); copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() { AtomicInteger threads = new AtomicInteger(); @@ -76,7 +79,7 @@ public class InitSync { while (collIterator.hasNext()) { String collectionName = collIterator.next(); CollectionMeta collectionMeta = new CollectionMeta(dataBaseName, collectionName); - if (filter.filter(collectionMeta)) { + if (context.filterMeta(collectionMeta)) { res.add(collectionMeta); } } @@ -92,40 +95,46 @@ public class InitSync { private MongoClient mongoClient; private CountDownLatch countDownLatch; private CollectionMeta collectionMeta; - private MongoReplicator mongoReplicator; + private ReplicaSet replicaSet; - public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, MongoReplicator mongoReplicator) { + public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, ReplicaSet replicaSet) { this.mongoClient = mongoClient; this.countDownLatch = countDownLatch; this.collectionMeta = collectionMeta; - this.mongoReplicator = mongoReplicator; + this.replicaSet = replicaSet; } @Override public void run() { - + logger.info("start copy database:{}, collection:{}", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName()); + int count = 0; try { - MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName()) .getCollection(collectionMeta.getCollectionName()) .find() .batchSize(200) .iterator(); - - while (mongoReplicator.isRuning() && mongoCursor.hasNext()) { + while (replicaSet.isRuning() && mongoCursor.hasNext()) { + if (context.initSyncAbort()) { + logger.info("init sync database:{}, collection:{} abort, has copy:{} document", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count); + return; + } + count++; Document document = mongoCursor.next(); - ReplicationEvent event = DocumentConvertEvent.convert(document); + ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName()); event.setOperationType(OperationType.CREATED); event.setNamespace(collectionMeta.getNameSpace()); - mongoReplicator.publishEvent(event); + context.publishEvent(event, replicaSetConfig); } } catch (Exception e) { - logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace()); + context.initSyncError(); + logger.error("init sync database:{}, collection:{} error", collectionMeta.getDatabaseName(), collectionMeta.getNameSpace(), e); } finally { countDownLatch.countDown(); + replicaSet.shutdown(); } - logger.info("database:{}, collection:{}, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName()); + logger.info("database:{}, collection:{}, copy {} documents, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName(), count); } } diff --git a/src/main/java/org/apache/connect/mongo/replicator/Constants.java b/src/main/java/org/apache/connect/mongo/replicator/Constants.java index 668fd91..1a91a57 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/Constants.java +++ b/src/main/java/org/apache/connect/mongo/replicator/Constants.java @@ -3,8 +3,6 @@ package org.apache.connect.mongo.replicator; public class Constants { - public static final String APPLICATION_NAME = "java-mongo-replicator"; - public static final String MONGO_LOCAL_DATABASE = "local"; public static final String MONGO_OPLOG_RS = "oplog.rs"; @@ -23,11 +21,7 @@ public class Constants { public static final String CREATED = "created"; public static final String PATCH = "patch"; - public static final String INITIAL = "initial"; - - public static final String POSITION_TIMESTAMP = "timeStamp"; - public static final String POSITION_INC = "inc"; public static final String INITSYNC = "initSync"; } diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java index 4a62e41..3e431a3 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/Filter.java +++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java @@ -4,7 +4,7 @@ package org.apache.connect.mongo.replicator; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import org.apache.commons.lang3.StringUtils; -import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.SourceTaskConfig; import org.apache.connect.mongo.initsync.CollectionMeta; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; @@ -21,9 +21,9 @@ public class Filter { private Function<OperationType, Boolean> notNoopFilter; - public Filter(MongoReplicatorConfig mongoReplicatorConfig) { + public Filter(SourceTaskConfig sourceTaskConfig) { - String interestDbAndCollection = mongoReplicatorConfig.getInterestDbAndCollection(); + String interestDbAndCollection = sourceTaskConfig.getInterestDbAndCollection(); if (StringUtils.isNotBlank(interestDbAndCollection)) { JSONObject jsonObject = JSONObject.parseObject(interestDbAndCollection); @@ -57,7 +57,7 @@ public class Filter { } - public boolean filter(CollectionMeta collectionMeta) { + public boolean filterMeta(CollectionMeta collectionMeta) { return dbAndCollectionFilter.apply(collectionMeta); } diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java deleted file mode 100644 index 60b8d3d..0000000 --- a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java +++ /dev/null @@ -1,153 +0,0 @@ -package org.apache.connect.mongo.replicator; - -import com.mongodb.ConnectionString; -import com.mongodb.MongoClientSettings; -import com.mongodb.client.MongoClient; -import com.mongodb.client.MongoClients; -import com.mongodb.client.MongoDatabase; -import com.mongodb.client.MongoIterable; -import org.apache.commons.lang3.StringUtils; -import org.apache.connect.mongo.MongoReplicatorConfig; -import org.apache.connect.mongo.replicator.event.ReplicationEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.connect.mongo.replicator.Constants.*; - - -public class MongoReplicator { - - private Logger logger = LoggerFactory.getLogger(this.getClass()); - - private AtomicBoolean running = new AtomicBoolean(); - - private MongoReplicatorConfig mongoReplicatorConfig; - - private MongoClientSettings clientSettings; - - private ConnectionString connectionString; - - private MongoClient mongoClient; - - private BlockingQueue<ReplicationEvent> queue = new LinkedBlockingQueue<>(); - - private Filter filter; - - private ExecutorService executorService; - - private volatile boolean pause = false; - - public MongoReplicator(MongoReplicatorConfig mongoReplicatorConfig) { - this.mongoReplicatorConfig = mongoReplicatorConfig; - this.filter = new Filter(mongoReplicatorConfig); - this.executorService = Executors.newSingleThreadExecutor((r) ->new Thread(r, "real_time_replica_thread")); - - buildConnectionString(); - } - - public void start() { - - try { - if (!running.compareAndSet(false, true)) { - logger.info("the java mongo replica already start"); - return; - } - - this.clientSettings = MongoClientSettings.builder() - .applicationName(APPLICATION_NAME) - .applyConnectionString(connectionString) - .build(); - this.mongoClient = MongoClients.create(clientSettings); - this.isReplicaMongo(); - executorService.submit(new ReplicatorTask(this, mongoClient, mongoReplicatorConfig, filter)); - }catch (Exception e) { - logger.info("start replicator error", e); - shutdown(); - } - } - - - private void buildConnectionString() { - StringBuilder sb = new StringBuilder(); - sb.append("mongodb://"); - if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName()) - && StringUtils.isNotBlank(mongoReplicatorConfig.getMongoPassWord())) { - sb.append(mongoReplicatorConfig.getMongoUserName()); - sb.append(":"); - sb.append(mongoReplicatorConfig.getMongoPassWord()); - sb.append("@"); - - } - sb.append(mongoReplicatorConfig.getMongoAddr()); - sb.append("/"); - if (StringUtils.isNotBlank(mongoReplicatorConfig.getReplicaSet())) { - sb.append("?"); - sb.append("replicaSet="); - sb.append(mongoReplicatorConfig.getReplicaSet()); - } - this.connectionString = new ConnectionString(sb.toString()); - } - - - public boolean isReplicaMongo() { - MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE); - MongoIterable<String> collectionNames = local.listCollectionNames(); - for (String collectionName : collectionNames) { - if (MONGO_OPLOG_RS.equals(collectionName)) { - return true; - } - } - this.shutdown(); - throw new IllegalStateException(String.format("url:%s, set:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getReplicaSet())); - } - - public void shutdown() { - if (running.compareAndSet(true, false)) { - if (!this.executorService.isShutdown()) { - executorService.shutdown(); - } - if (this.mongoClient != null) { - this.mongoClient.close(); - } - } - - } - - public void publishEvent(ReplicationEvent replicationEvent) { - while (true) { - try { - queue.put(replicationEvent); - break; - } catch (Exception e) { - } - } - } - - - - public void pause() { - pause = true; - } - - public void resume() { - pause = false; - } - - public boolean isPause() { - return pause; - } - - public boolean isRuning() { - return running.get(); - } - - public BlockingQueue<ReplicationEvent> getQueue() { - return queue; - } -} diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java new file mode 100644 index 0000000..b141c2b --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSet.java @@ -0,0 +1,101 @@ +package org.apache.connect.mongo.replicator; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.MongoIterable; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.connect.mongo.replicator.Constants.MONGO_LOCAL_DATABASE; +import static org.apache.connect.mongo.replicator.Constants.MONGO_OPLOG_RS; + + +public class ReplicaSet { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + private AtomicBoolean running = new AtomicBoolean(); + + private ReplicaSetConfig replicaSetConfig; + + + private ReplicaSetsContext replicaSetsContext; + + private MongoClient mongoClient; + + private ExecutorService executorService; + + private volatile boolean pause = false; + + public ReplicaSet(ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) { + this.replicaSetConfig = replicaSetConfig; + this.replicaSetsContext = replicaSetsContext; + this.executorService = Executors.newSingleThreadExecutor((r) -> new Thread(r, "real_time_replica_" + replicaSetConfig.getReplicaSetName() + "thread")); + + } + + public void start() { + + try { + if (!running.compareAndSet(false, true)) { + logger.info("the java mongo replica already start"); + return; + } + this.mongoClient = replicaSetsContext.createMongoClient(replicaSetConfig); + this.isReplicaMongo(); + executorService.submit(new ReplicatorTask(this, mongoClient, replicaSetConfig, replicaSetsContext)); + } catch (Exception e) { + logger.error("start replicator:{} error", replicaSetConfig, e); + shutdown(); + } + } + + + public boolean isReplicaMongo() { + MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE); + MongoIterable<String> collectionNames = local.listCollectionNames(); + MongoCursor<String> iterator = collectionNames.iterator(); + while (iterator.hasNext()) { + if (StringUtils.equals(MONGO_OPLOG_RS, iterator.next())) { + return true; + } + } + this.shutdown(); + throw new IllegalStateException(String.format("url:%s, is not replica", replicaSetConfig.getHost())); + } + + public void shutdown() { + if (running.compareAndSet(true, false)) { + if (!this.executorService.isShutdown()) { + executorService.shutdown(); + } + if (this.mongoClient != null) { + this.mongoClient.close(); + } + } + + } + + + public void pause() { + pause = true; + } + + public void resume() { + pause = false; + } + + public boolean isPause() { + return pause; + } + + public boolean isRuning() { + return running.get(); + } +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java new file mode 100644 index 0000000..4b8d148 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetConfig.java @@ -0,0 +1,139 @@ +package org.apache.connect.mongo.replicator; + +import org.bson.BsonTimestamp; + +import java.util.Objects; + +public class ReplicaSetConfig { + + + private String shardName; + private String replicaSetName; + private String host; + private Position position; + + + public Position getPosition() { + return position; + } + + public void setPosition(Position position) { + this.position = position; + } + + public String getShardName() { + return shardName; + } + + public void setShardName(String shardName) { + this.shardName = shardName; + } + + public String getReplicaSetName() { + return replicaSetName; + } + + public void setReplicaSetName(String replicaSetName) { + this.replicaSetName = replicaSetName; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public ReplicaSetConfig(String shardName, String replicaSetName, String host) { + this.shardName = shardName; + this.replicaSetName = replicaSetName; + this.host = host; + } + + public Position emptyPosition() { + return new Position(0, 0, true); + } + + + public class Position { + private int timeStamp; + private int inc; + private boolean initSync; + + + public int getTimeStamp() { + return timeStamp; + } + + public void setTimeStamp(int timeStamp) { + this.timeStamp = timeStamp; + } + + public int getInc() { + return inc; + } + + public void setInc(int inc) { + this.inc = inc; + } + + public boolean isInitSync() { + return initSync; + } + + public void setInitSync(boolean initSync) { + this.initSync = initSync; + } + + + public Position(int timeStamp, int inc, boolean initSync) { + this.timeStamp = timeStamp; + this.inc = inc; + this.initSync = initSync; + } + + public boolean isValid() { + return timeStamp > 0; + } + + public BsonTimestamp converBsonTimeStamp() { + return new BsonTimestamp(timeStamp, inc); + } + + @Override + public String toString() { + return "Position{" + + "timeStamp=" + timeStamp + + ", inc=" + inc + + ", initSync=" + initSync + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Position position = (Position) o; + return timeStamp == position.timeStamp && + inc == position.inc && + initSync == position.initSync; + } + + @Override + public int hashCode() { + return Objects.hash(timeStamp, inc, initSync); + } + } + + + @Override + public String toString() { + return "ReplicaSetConfig{" + + "shardName='" + shardName + '\'' + + ", replicaSetName='" + replicaSetName + '\'' + + ", host='" + host + '\'' + + ", position=" + position + + '}'; + } +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java new file mode 100644 index 0000000..9184b90 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSets.java @@ -0,0 +1,70 @@ +package org.apache.connect.mongo.replicator; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class ReplicaSets { + + private static final Pattern HOST_PATTERN = Pattern.compile("((([^=]+)[=])?(([^/]+)\\/))?(.+)"); + + + private final Map<String, ReplicaSetConfig> replicaConfigByName = new HashMap<>(); + + + public ReplicaSets(Set<ReplicaSetConfig> replicaSetConfigs) { + replicaSetConfigs.forEach(replicaSetConfig -> { + if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) { + replicaConfigByName.put(replicaSetConfig.getReplicaSetName(), replicaSetConfig); + } + }); + + validate(); + } + + public static ReplicaSets create(String hosts) { + Set<ReplicaSetConfig> replicaSetConfigs = new HashSet<>(); + if (hosts != null) { + for (String replicaSetStr : StringUtils.split(hosts.trim(), ";")) { + if (StringUtils.isNotBlank(replicaSetStr)) { + ReplicaSetConfig replicaSetConfig = parseReplicaSetStr(replicaSetStr); + if (replicaSetConfig != null) { + replicaSetConfigs.add(replicaSetConfig); + } + } + } + } + return new ReplicaSets(replicaSetConfigs); + } + + + private static ReplicaSetConfig parseReplicaSetStr(String hosts) { + if (hosts != null) { + Matcher matcher = HOST_PATTERN.matcher(hosts); + if (matcher.matches()) { + String shard = matcher.group(3); + String replicaSetName = matcher.group(5); + String host = matcher.group(6); + if (host != null && host.trim().length() != 0) { + return new ReplicaSetConfig(shard, replicaSetName, host); + } + } + } + return null; + } + + private void validate() { + Validate.isTrue(replicaConfigByName.size() > 0, "task config mongoAdd need special replicaSet addr"); + + } + + public Map<String, ReplicaSetConfig> getReplicaConfigByName() { + return replicaConfigByName; + } +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java new file mode 100644 index 0000000..f66ca14 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicaSetsContext.java @@ -0,0 +1,129 @@ +package org.apache.connect.mongo.replicator; + +import com.mongodb.ConnectionString; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import io.openmessaging.connector.api.data.SourceDataEntry; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.SourceTaskConfig; +import org.apache.connect.mongo.connector.builder.MongoDataEntry; +import org.apache.connect.mongo.initsync.CollectionMeta; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ReplicaSetsContext { + + private BlockingQueue<SourceDataEntry> dataEntryQueue; + + private SourceTaskConfig taskConfig; + + private List<ReplicaSet> replicaSets; + + private AtomicBoolean initSyncAbort = new AtomicBoolean(); + + private Filter filter; + + public ReplicaSetsContext(SourceTaskConfig taskConfig) { + this.taskConfig = taskConfig; + this.replicaSets = new CopyOnWriteArrayList<>(); + this.dataEntryQueue = new LinkedBlockingDeque<>(); + this.filter = new Filter(taskConfig); + } + + + public MongoClient createMongoClient(ReplicaSetConfig replicaSetConfig) { + StringBuilder sb = new StringBuilder(); + sb.append("mongodb://"); + if (StringUtils.isNotBlank(taskConfig.getMongoUserName()) + && StringUtils.isNotBlank(taskConfig.getMongoPassWord())) { + sb.append(taskConfig.getMongoUserName()); + sb.append(":"); + sb.append(taskConfig.getMongoPassWord()); + sb.append("@"); + + } + sb.append(replicaSetConfig.getHost()); + sb.append("/"); + if (StringUtils.isNotBlank(replicaSetConfig.getReplicaSetName())) { + sb.append("?"); + sb.append("replicaSet="); + sb.append(replicaSetConfig.getReplicaSetName()); + } + ConnectionString connectionString = new ConnectionString(sb.toString()); + return MongoClients.create(connectionString); + } + + + public boolean filterEvent(ReplicationEvent event) { + return filter.filterEvent(event); + } + + + public boolean filterMeta(CollectionMeta collectionMeta) { + return filter.filterMeta(collectionMeta); + } + + + public int getCopyThread() { + return taskConfig.getCopyThread() > 0 ? taskConfig.getCopyThread() : Runtime.getRuntime().availableProcessors(); + } + + + public void addReplicaSet(ReplicaSet replicaSet) { + this.replicaSets.add(replicaSet); + } + + + public void shutdown() { + replicaSets.forEach(ReplicaSet::shutdown); + } + + public void pause() { + replicaSets.forEach(ReplicaSet::pause); + } + + + public void resume() { + replicaSets.forEach(ReplicaSet::resume); + } + + + public void publishEvent(ReplicationEvent event, ReplicaSetConfig replicaSetConfig) { + SourceDataEntry sourceDataEntry = MongoDataEntry.createSouceDataEntry(event, replicaSetConfig); + while (true) { + try { + dataEntryQueue.put(sourceDataEntry); + break; + } catch (InterruptedException e) { + } + } + + } + + public Collection<SourceDataEntry> poll() { + List<SourceDataEntry> res = new ArrayList<>(); + if (dataEntryQueue.drainTo(res, 20) == 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + return res; + } + + public boolean initSyncAbort() { + return initSyncAbort.get(); + } + + public void initSyncError() { + initSyncAbort.set(true); + } + +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java index 766225f..bf4ebac 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java @@ -6,9 +6,8 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; -import org.apache.connect.mongo.MongoReplicatorConfig; import org.apache.connect.mongo.initsync.InitSync; -import org.apache.connect.mongo.replicator.event.DocumentConvertEvent; +import org.apache.connect.mongo.replicator.event.EventConverter; import org.apache.connect.mongo.replicator.event.ReplicationEvent; import org.bson.Document; import org.slf4j.Logger; @@ -19,34 +18,34 @@ public class ReplicatorTask implements Runnable { private Logger logger = LoggerFactory.getLogger(this.getClass()); - private MongoReplicator mongoReplicator; + private ReplicaSet replicaSet; private MongoClient mongoClient; - private MongoReplicatorConfig mongoReplicatorConfig; + private ReplicaSetConfig replicaSetConfig; - private Filter filter; + private ReplicaSetsContext replicaSetsContext; - public ReplicatorTask(MongoReplicator mongoReplicator, MongoClient mongoClient, MongoReplicatorConfig mongoReplicatorConfig, Filter filter) { - this.mongoReplicator = mongoReplicator; - this.mongoReplicatorConfig = mongoReplicatorConfig; + public ReplicatorTask(ReplicaSet replicaSet, MongoClient mongoClient, ReplicaSetConfig replicaSetConfig, ReplicaSetsContext replicaSetsContext) { + this.replicaSet = replicaSet; + this.replicaSetConfig = replicaSetConfig; this.mongoClient = mongoClient; - this.filter = filter; + this.replicaSetsContext = replicaSetsContext; } @Override public void run() { - if (mongoReplicatorConfig.getDataSync()) { - InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator); + if (replicaSetConfig.getPosition() == null || replicaSetConfig.getPosition().isInitSync()) { + InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet); initSync.start(); } MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE); FindIterable<Document> iterable; - if (mongoReplicatorConfig.getPositionTimeStamp() > 0 && mongoReplicatorConfig.getPositionTimeStamp() < System.currentTimeMillis()) { + if (replicaSetConfig.getPosition().isValid()) { iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find( - Filters.gt("ts", mongoReplicatorConfig.getPosition())); + Filters.gt("ts", replicaSetConfig.getPosition().converBsonTimeStamp())); } else { iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); } @@ -56,26 +55,28 @@ public class ReplicatorTask implements Runnable { .batchSize(200) .iterator(); - while (mongoReplicator.isRuning()) { + while (replicaSet.isRuning()) { try { executorCursor(cursor); - Thread.sleep(100); } catch (Exception e) { - e.printStackTrace(); + logger.error("replicaSet:{} shutdown.....", replicaSetConfig, e); } finally { - logger.error("mongoReplicator shutdown....."); - mongoReplicator.shutdown(); + if (cursor != null) { + cursor.close(); + } + replicaSet.shutdown(); } } + logger.info("replicaSet:{}, already shutdown, replicaTask end of life cycle", replicaSetConfig); } private void executorCursor(MongoCursor<Document> cursor) { - while (cursor.hasNext() && !mongoReplicator.isPause()) { + while (cursor.hasNext() && !replicaSet.isPause()) { Document document = cursor.next(); - ReplicationEvent event = DocumentConvertEvent.convert(document); - if (filter.filterEvent(event)) { - mongoReplicator.publishEvent(event); + ReplicationEvent event = EventConverter.convert(document, replicaSetConfig.getReplicaSetName()); + if (replicaSetsContext.filterEvent(event)) { + replicaSetsContext.publishEvent(event, replicaSetConfig); } } } diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java deleted file mode 100644 index a18aa52..0000000 --- a/src/main/java/org/apache/connect/mongo/replicator/event/DocumentConvertEvent.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.apache.connect.mongo.replicator.event; - -import org.bson.BsonTimestamp; -import org.bson.Document; - -import java.util.Optional; - -import static org.apache.connect.mongo.replicator.Constants.*; - - -public class DocumentConvertEvent { - - - public static ReplicationEvent convert(Document document) { - - OperationType operationType = OperationType.getOperationType(document.getString(OPERATIONTYPE)); - BsonTimestamp timestamp = (BsonTimestamp) document.get(TIMESTAMP); -// Long t = document.getLong("t"); - Long h = document.getLong(HASH); - Integer v = document.getInteger(VERSION); - String nameSpace = document.getString(NAMESPACE); -// String uuid = document.getString("uuid"); -// Date wall = document.getDate("wall"); - Document operation = document.get(OPERATION, Document.class); - Document objectID = document.get(OBJECTID, Document.class); - return new ReplicationEvent(operationType, timestamp, v, h, nameSpace, Optional.ofNullable(operation), Optional.ofNullable(objectID), document); - } - -} diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java new file mode 100644 index 0000000..57b4ac8 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/event/EventConverter.java @@ -0,0 +1,29 @@ +package org.apache.connect.mongo.replicator.event; + +import org.bson.BsonTimestamp; +import org.bson.Document; + +import java.util.Optional; + +import static org.apache.connect.mongo.replicator.Constants.*; + + +public class EventConverter { + + + public static ReplicationEvent convert(Document document, String replicaSetName) { + + ReplicationEvent event = new ReplicationEvent(); + event.setOperationType(OperationType.getOperationType(document.getString(OPERATIONTYPE))); + event.setTimestamp(document.get(TIMESTAMP, BsonTimestamp.class)); + event.setH(document.getLong(HASH)); + event.setV(document.getInteger(VERSION)); + event.setNamespace(document.getString(NAMESPACE)); + event.setEventData(Optional.ofNullable(document.get(OPERATION, Document.class))); + event.setObjectId(Optional.ofNullable(document.get(OBJECTID, Document.class))); + event.setReplicaSetName(replicaSetName); + event.setDocument(document); + return event; + } + +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java index 7719b3e..283e9d6 100644 --- a/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java +++ b/src/main/java/org/apache/connect/mongo/replicator/event/ReplicationEvent.java @@ -19,6 +19,7 @@ public class ReplicationEvent { private String namespace; private Optional<Document> eventData; private Optional<Document> objectId; + private String replicaSetName; public ReplicationEvent() { @@ -136,18 +137,29 @@ public class ReplicationEvent { this.objectId = objectId; } + + public void setReplicaSetName(String replicaSetName) { + this.replicaSetName = replicaSetName; + } + + public String getReplicaSetName() { + return replicaSetName; + } + @Override public String toString() { return "ReplicationEvent{" + - "operationType=" + operationType + + "document=" + document + + ", operationType=" + operationType + ", v=" + v + ", h=" + h + ", timestamp=" + timestamp + ", databaseName='" + databaseName + '\'' + ", collectionName='" + collectionName + '\'' + ", namespace='" + namespace + '\'' + - ", eventData=" + eventData.toString() + - ", objectId=" + objectId.toString() + + ", eventData=" + eventData + + ", objectId=" + objectId + + ", replicaSetName='" + replicaSetName + '\'' + '}'; } } diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java index 2b14b13..60e2514 100644 --- a/src/test/java/org/apache/connect/mongo/FilterTest.java +++ b/src/test/java/org/apache/connect/mongo/FilterTest.java @@ -17,12 +17,12 @@ import java.util.Map; public class FilterTest { - private MongoReplicatorConfig config; + private SourceTaskConfig sourceTaskConfig; private Map<String, List<String>> insterest; @Before public void init() { - config = new MongoReplicatorConfig(); + sourceTaskConfig = new SourceTaskConfig(); insterest = new HashMap<>(); } @@ -31,18 +31,18 @@ public class FilterTest { List<String> collections = new ArrayList<>(); collections.add("person"); insterest.put("test", collections); - config.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); - Filter filter = new Filter(config); - Assert.assertTrue(filter.filter(new CollectionMeta("test", "person"))); - Assert.assertFalse(filter.filter(new CollectionMeta("test", "person01"))); + sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); + Filter filter = new Filter(sourceTaskConfig); + Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "person"))); + Assert.assertFalse(filter.filterMeta(new CollectionMeta("test", "person01"))); } @Test public void testBlankDb() { - Filter filter = new Filter(config); - Assert.assertTrue(filter.filter(new CollectionMeta("test" ,"test"))); - Assert.assertTrue(filter.filter(new CollectionMeta("test1" ,"test01"))); + Filter filter = new Filter(sourceTaskConfig); + Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "test"))); + Assert.assertTrue(filter.filterMeta(new CollectionMeta("test1", "test01"))); } @@ -51,17 +51,17 @@ public class FilterTest { List<String> collections = new ArrayList<>(); collections.add("*"); insterest.put("test", collections); - config.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); - Filter filter = new Filter(config); - Assert.assertTrue(filter.filter(new CollectionMeta("test", "testsad"))); - Assert.assertTrue(filter.filter(new CollectionMeta("test", "tests032"))); + sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); + Filter filter = new Filter(sourceTaskConfig); + Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "testsad"))); + Assert.assertTrue(filter.filterMeta(new CollectionMeta("test", "tests032"))); } @Test public void testFilterEvent() { - Filter filter = new Filter(config); + Filter filter = new Filter(sourceTaskConfig); ReplicationEvent replicationEvent = new ReplicationEvent(); replicationEvent.setOperationType(OperationType.NOOP); Assert.assertFalse(filter.filterEvent(replicationEvent)); diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java index 04e1374..d921e63 100644 --- a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java @@ -1,22 +1,40 @@ package org.apache.connect.mongo; +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.connector.api.data.EntryType; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SourceDataEntry; import io.openmessaging.internal.DefaultKeyValue; -import org.apache.commons.lang3.StringUtils; import org.apache.connect.mongo.connector.MongoSourceConnector; import org.apache.connect.mongo.connector.MongoSourceTask; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.ReplicaSetsContext; +import org.apache.connect.mongo.replicator.event.OperationType; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.bson.BsonTimestamp; +import org.bson.Document; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.LinkedBlockingQueue; + public class MongoSourceConnectorTest { private MongoSourceConnector mongoSourceConnector; private DefaultKeyValue keyValue; + private SourceTaskConfig sourceTaskConfig; @Before public void before() { mongoSourceConnector = new MongoSourceConnector(); keyValue = new DefaultKeyValue(); + sourceTaskConfig = new SourceTaskConfig(); + } @Test @@ -27,19 +45,58 @@ public class MongoSourceConnectorTest { @Test public void verifyConfig() { - keyValue.put("mongoAddr", "127.0.0.1"); + keyValue.put("mongoAddr", "shardName=replicaName:127.0.0.1:27017"); String s = mongoSourceConnector.verifyAndSetConfig(keyValue); - Assert.assertTrue(s.contains("Request config key:")); - keyValue.put("mongoPort", 27017); - s = mongoSourceConnector.verifyAndSetConfig(keyValue); - Assert.assertTrue(StringUtils.isBlank(s)); + Assert.assertTrue(s.contains("Request sourceTaskConfig key:")); } + @Test + public void testPoll() throws Exception { + LinkedBlockingQueue<SourceDataEntry> entries = new LinkedBlockingQueue<>(); + ReplicaSetsContext context = new ReplicaSetsContext(sourceTaskConfig); + Field dataEntryQueue = ReplicaSetsContext.class.getDeclaredField("dataEntryQueue"); + dataEntryQueue.setAccessible(true); + dataEntryQueue.set(context, entries); + ReplicationEvent event = new ReplicationEvent(); + event.setOperationType(OperationType.INSERT); + event.setNamespace("test.person"); + event.setTimestamp(new BsonTimestamp(1565609506, 1)); + event.setDocument(new Document("testKey", "testValue")); + event.setH(324243242L); + event.setEventData(Optional.ofNullable(new Document("testEventKey", "testEventValue"))); + event.setObjectId(Optional.empty()); + context.publishEvent(event, new ReplicaSetConfig("", "testReplicaName", "localhost:27017")); + List<SourceDataEntry> sourceDataEntries = (List<SourceDataEntry>) context.poll(); + Assert.assertTrue(sourceDataEntries.size() == 1); + + SourceDataEntry sourceDataEntry = sourceDataEntries.get(0); + Assert.assertEquals("test-person", sourceDataEntry.getQueueName()); + + ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition(); + Assert.assertEquals("testReplicaName", new String(sourcePartition.array())); + ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition(); + ReplicaSetConfig.Position position = JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class); + Assert.assertEquals(position.getTimeStamp(), 1565609506); + Assert.assertEquals(position.getInc(), 1); + Assert.assertEquals(position.isInitSync(), false); + EntryType entryType = sourceDataEntry.getEntryType(); + Assert.assertEquals(EntryType.CREATE, entryType); + + String queueName = sourceDataEntry.getQueueName(); + Assert.assertEquals("test-person", queueName); + + Schema schema = sourceDataEntry.getSchema(); + Assert.assertTrue(schema.getFields().size() == 6); + Object[] payload = sourceDataEntry.getPayload(); + Assert.assertTrue(payload.length == 6); + + } + } diff --git a/src/test/java/org/apache/connect/mongo/MongoTest.java b/src/test/java/org/apache/connect/mongo/MongoTest.java index cc83fbe..7b0291a 100644 --- a/src/test/java/org/apache/connect/mongo/MongoTest.java +++ b/src/test/java/org/apache/connect/mongo/MongoTest.java @@ -7,11 +7,14 @@ import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import io.openmessaging.connector.api.data.EntryType; +import io.openmessaging.connector.api.data.Schema; +import io.openmessaging.connector.api.data.SourceDataEntry; import org.apache.connect.mongo.initsync.InitSync; import org.apache.connect.mongo.replicator.Constants; -import org.apache.connect.mongo.replicator.Filter; -import org.apache.connect.mongo.replicator.MongoReplicator; -import org.apache.connect.mongo.replicator.event.DocumentConvertEvent; +import org.apache.connect.mongo.replicator.ReplicaSet; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.ReplicaSetsContext; +import org.apache.connect.mongo.replicator.event.EventConverter; import org.apache.connect.mongo.replicator.event.OperationType; import org.apache.connect.mongo.replicator.event.ReplicationEvent; import org.bson.BsonTimestamp; @@ -21,12 +24,8 @@ import org.junit.Before; import org.junit.Test; import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; +import java.nio.ByteBuffer; +import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; public class MongoTest { @@ -36,7 +35,7 @@ public class MongoTest { @Before public void before() { MongoClientSettings.Builder builder = MongoClientSettings.builder(); - builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27018")); + builder.applyConnectionString(new ConnectionString("mongodb://127.0.0.1:27077")); mongoClient = MongoClients.create(builder.build()); } @@ -52,13 +51,14 @@ public class MongoTest { Document document = new Document(); document.put("test", "test"); oplog.put(Constants.OPERATION, document); - ReplicationEvent event = DocumentConvertEvent.convert(oplog); + ReplicationEvent event = EventConverter.convert(oplog, "testR"); Assert.assertEquals(timestamp, event.getTimestamp()); Assert.assertEquals("test.person", event.getNamespace()); Assert.assertTrue(11111L == event.getH()); Assert.assertEquals(OperationType.INSERT, event.getOperationType()); Assert.assertEquals(EntryType.CREATE, event.getEntryType()); Assert.assertEquals(document, event.getEventData().get()); + Assert.assertEquals("testR", event.getReplicaSetName()); } @@ -68,38 +68,58 @@ public class MongoTest { public void testInitSyncCopy() throws NoSuchFieldException, IllegalAccessException, InterruptedException { MongoCollection<Document> collection = mongoClient.getDatabase("test").getCollection("person"); collection.deleteMany(new Document()); - int count = 1; - List<Document> documents = new ArrayList<>(count); + int count = 1000; + List<String> documents = new ArrayList<>(count); for (int i = 0; i < count; i++) { Document document = new Document(); document.put("name", "test" + i); document.put("age", i); document.put("sex", i % 2 == 0 ? "boy" : "girl"); collection.insertOne(document); - documents.add(document); + documents.add(document.getObjectId("_id").toHexString()); } - MongoReplicatorConfig config = new MongoReplicatorConfig(); + SourceTaskConfig sourceTaskConfig = new SourceTaskConfig(); Map<String, List<String>> insterest = new HashMap<>(); List<String> collections = new ArrayList<>(); collections.add("*"); insterest.put("test", collections); - config.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); - MongoReplicator mongoReplicator = new MongoReplicator(config); - Field running = MongoReplicator.class.getDeclaredField("running"); + sourceTaskConfig.setInterestDbAndCollection(JSONObject.toJSONString(insterest)); + ReplicaSetConfig replicaSetConfig = new ReplicaSetConfig("", "test", "localhost"); + ReplicaSetsContext replicaSetsContext = new ReplicaSetsContext(sourceTaskConfig); + ReplicaSet replicaSet = new ReplicaSet(replicaSetConfig, replicaSetsContext); + Field running = ReplicaSet.class.getDeclaredField("running"); running.setAccessible(true); - running.set(mongoReplicator, new AtomicBoolean(true)); - InitSync initSync = new InitSync(config, mongoClient, new Filter(config), mongoReplicator); + running.set(replicaSet, new AtomicBoolean(true)); + InitSync initSync = new InitSync(replicaSetConfig, mongoClient, replicaSetsContext, replicaSet); initSync.start(); - BlockingQueue<ReplicationEvent> queue = mongoReplicator.getQueue(); - while (count > 0) { - count--; - ReplicationEvent event = queue.poll(100, TimeUnit.MILLISECONDS); - Assert.assertTrue(event.getOperationType().equals(OperationType.CREATED)); - Assert.assertNotNull(event.getDocument()); - Assert.assertTrue(documents.contains(event.getDocument())); - } - + int syncCount = 0; + while (syncCount < count) { + Collection<SourceDataEntry> sourceDataEntries = replicaSetsContext.poll(); + Assert.assertNotNull(sourceDataEntries); + for (SourceDataEntry sourceDataEntry : sourceDataEntries) { + ByteBuffer sourcePartition = sourceDataEntry.getSourcePartition(); + Assert.assertEquals("test", new String(sourcePartition.array())); + ByteBuffer sourcePosition = sourceDataEntry.getSourcePosition(); + ReplicaSetConfig.Position position = replicaSetConfig.emptyPosition(); + position.setInitSync(true); + position.setTimeStamp(0); + position.setInc(0); + Assert.assertEquals(position, JSONObject.parseObject(new String(sourcePosition.array()), ReplicaSetConfig.Position.class)); + EntryType entryType = sourceDataEntry.getEntryType(); + Assert.assertEquals(EntryType.CREATE, entryType); + String queueName = sourceDataEntry.getQueueName(); + Assert.assertEquals("test-person", queueName); + Schema schema = sourceDataEntry.getSchema(); + Assert.assertTrue(schema.getFields().size() == 2); + Object[] payload = sourceDataEntry.getPayload(); + Assert.assertTrue(payload.length == 2); + Assert.assertEquals(payload[0].toString(), "test.person"); + Assert.assertTrue(documents.contains(JSONObject.parseObject(payload[1].toString(), Document.class).get("_id", JSONObject.class).getString("$oid"))); + syncCount++; + } + } + Assert.assertTrue(syncCount == count); } } diff --git a/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java new file mode 100644 index 0000000..8613c42 --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/ReplicaContextTest.java @@ -0,0 +1,34 @@ +package org.apache.connect.mongo; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoIterable; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.ReplicaSetsContext; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ReplicaContextTest { + + private ReplicaSetsContext context; + + @Before + public void before() { + SourceTaskConfig sourceTaskConfig = new SourceTaskConfig(); + context = new ReplicaSetsContext(sourceTaskConfig); + } + + + @Test + public void testCreateMongoClient() { + MongoClient mongoClient = context.createMongoClient(new ReplicaSetConfig("shardName1", "rep1", "127.0.0.1:27017")); + MongoIterable<String> collectionNames = mongoClient.getDatabase("local").listCollectionNames(); + MongoCursor<String> iterator = collectionNames.iterator(); + while (iterator.hasNext()) { + Assert.assertTrue(StringUtils.isNoneBlank(iterator.next())); + } + } + +} diff --git a/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java new file mode 100644 index 0000000..e69eac6 --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/ReplicaSetsTest.java @@ -0,0 +1,65 @@ +package org.apache.connect.mongo; + +import org.apache.connect.mongo.replicator.ReplicaSetConfig; +import org.apache.connect.mongo.replicator.ReplicaSets; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class ReplicaSetsTest { + + + @Test(expected = IllegalArgumentException.class) + public void testCreatReplicaSetsException01() { + ReplicaSets.create(""); + } + + + @Test(expected = IllegalArgumentException.class) + public void testCreatReplicaSetsException02() { + ReplicaSets.create("127.0.0.1:27081"); + } + + + @Test + public void testCreatReplicaSets01() { + ReplicaSets replicaSets = ReplicaSets.create("replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083"); + Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); + Assert.assertTrue(replicaSetConfigMap.size() == 1); + Assert.assertNotNull(replicaSetConfigMap.get("replicaName1")); + Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost()); + Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName()); + } + + + @Test + public void testCreatReplicaSets02() { + ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083"); + Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); + Assert.assertTrue(replicaSetConfigMap.size() == 1); + Assert.assertNotNull(replicaSetConfigMap.get("replicaName1")); + Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost()); + Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName()); + Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName()); + } + + + @Test + public void testCreatReplicaSets03() { + ReplicaSets replicaSets = ReplicaSets.create("shardName1=replicaName1/127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083;shardName2=replicaName2/127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283"); + Map<String, ReplicaSetConfig> replicaSetConfigMap = replicaSets.getReplicaConfigByName(); + Assert.assertTrue(replicaSetConfigMap.size() == 2); + Assert.assertNotNull(replicaSetConfigMap.get("replicaName1")); + Assert.assertEquals("127.0.0.1:27081,127.0.0.1:27082,127.0.0.1:27083", replicaSetConfigMap.get("replicaName1").getHost()); + Assert.assertEquals("replicaName1", replicaSetConfigMap.get("replicaName1").getReplicaSetName()); + Assert.assertEquals("shardName1", replicaSetConfigMap.get("replicaName1").getShardName()); + + + Assert.assertNotNull(replicaSetConfigMap.get("replicaName2")); + Assert.assertEquals("127.0.0.1:27281,127.0.0.1:27282,127.0.0.1:27283", replicaSetConfigMap.get("replicaName2").getHost()); + Assert.assertEquals("replicaName2", replicaSetConfigMap.get("replicaName2").getReplicaSetName()); + Assert.assertEquals("shardName2", replicaSetConfigMap.get("replicaName2").getShardName()); + } + +}
