This is an automated email from the ASF dual-hosted git repository. zhoubo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit f629088918a2cc725326b981749798c0ba4b64de Author: 李平 <[email protected]> AuthorDate: Mon Aug 5 15:01:55 2019 +0800 add junit test and modify some code --- pom.xml | 180 +++++++++++++++++++++ .../connect/mongo/MongoReplicatorConfig.java | 157 ++++++++++++++++++ .../mongo/connector/MongoSourceConnector.java | 61 +++++++ .../connect/mongo/connector/MongoSourceTask.java | 148 +++++++++++++++++ .../connect/mongo/initsync/CollectionMeta.java | 41 +++++ .../apache/connect/mongo/initsync/InitSync.java | 146 +++++++++++++++++ .../apache/connect/mongo/replicator/Filter.java | 74 +++++++++ .../connect/mongo/replicator/MongoReplicator.java | 152 +++++++++++++++++ .../connect/mongo/replicator/ReplicatorTask.java | 84 ++++++++++ .../java/org/apache/connect/mongo/FilterTest.java | 67 ++++++++ .../connect/mongo/MongoSourceConnectorTest.java | 45 ++++++ .../org/apache/connect/mongo/ReplicatorTest.java | 33 ++++ 12 files changed, 1188 insertions(+) diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..7ea7ab1 --- /dev/null +++ b/pom.xml @@ -0,0 +1,180 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-connect-mongo</artifactId> + <version>0.0.1-SNAPSHOT</version> + + <name>connect-mongo</name> + <url>https://github.com/apache/incubator-rocketmq-externals/tree/master/rocketmq-connect-mongo</url> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <issueManagement> + <system>jira</system> + <url>https://issues.apache.org/jira/browse/RocketMQ</url> + </issueManagement> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + + <!-- Compiler settings properties --> + <maven.compiler.source>1.8</maven.compiler.source> + <maven.compiler.target>1.8</maven.compiler.target> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>versions-maven-plugin</artifactId> + <version>2.3</version> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>clirr-maven-plugin</artifactId> + <version>2.7</version> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.6.1</version> + <configuration> + <source>${maven.compiler.source}</source> + <target>${maven.compiler.target}</target> + <compilerVersion>${maven.compiler.source}</compilerVersion> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.19.1</version> + <configuration> + <argLine>-Xms512m -Xmx1024m</argLine> + <forkMode>always</forkMode> + <includes> + <include>**/*Test.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <artifactId>maven-site-plugin</artifactId> + <version>3.6</version> + <configuration> + <locales>en_US</locales> + <outputEncoding>UTF-8</outputEncoding> + <inputEncoding>UTF-8</inputEncoding> + </configuration> + </plugin> + <plugin> + <artifactId>maven-source-plugin</artifactId> + <version>3.0.1</version> + <executions> + <execution> + <id>attach-sources</id> + <goals> + <goal>jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-javadoc-plugin</artifactId> + <version>2.10.4</version> + <configuration> + <charset>UTF-8</charset> + <locale>en_US</locale> + <excludePackageNames>io.openmessaging.internal</excludePackageNames> + </configuration> + <executions> + <execution> + <id>aggregate</id> + <goals> + <goal>aggregate</goal> + </goals> + <phase>site</phase> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>3.0.2</version> + <configuration> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + <version>3.0.4</version> + </plugin> + </plugins> + </build> + + + <dependencies> + <dependency> + <groupId>org.mongodb</groupId> + <artifactId>mongodb-driver</artifactId> + <version>3.10.1</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.9</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>1.7.26</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-connector</artifactId> + <version>0.1.0-beta</version> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.13</version> + </dependency> + <dependency> + <groupId>io.openmessaging</groupId> + <artifactId>openmessaging-api</artifactId> + <version>1.0.0-alpha</version> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-openmessaging</artifactId> + <version>4.3.2</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java new file mode 100644 index 0000000..18a834f --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/MongoReplicatorConfig.java @@ -0,0 +1,157 @@ +package org.apache.connect.mongo; + +import io.openmessaging.KeyValue; + +import java.lang.reflect.Method; +import java.util.HashSet; +import java.util.Set; + +public class MongoReplicatorConfig { + + private String mongoAddr; + private int mongoPort; + private String mongoUserName; + private String mongoPassWord; + private String interestDB; + private String interestCollection; + private long positionTimeStamp; + private int positionInc; + private String dataSync; + private int copyThread = Runtime.getRuntime().availableProcessors(); + + + public static final Set<String> REQUEST_CONFIG = new HashSet<String>() { + { + add("mongoAddr"); + add("mongoPort"); + } + }; + + public int getPositionInc() { + return positionInc; + } + + public void setPositionInc(int positionInc) { + this.positionInc = positionInc; + } + + public int getCopyThread() { + return copyThread; + } + + public void setCopyThread(int copyThread) { + this.copyThread = copyThread; + } + + public long getPositionTimeStamp() { + return positionTimeStamp; + } + + public void setPositionTimeStamp(long positionTimeStamp) { + this.positionTimeStamp = positionTimeStamp; + } + + public String getInterestDB() { + return interestDB; + } + + public void setInterestDB(String interestDB) { + this.interestDB = interestDB; + } + + public String getInterestCollection() { + return interestCollection; + } + + public void setInterestCollection(String interestCollection) { + this.interestCollection = interestCollection; + } + + public String getMongoAddr() { + return mongoAddr; + } + + public void setMongoAddr(String mongoAddr) { + this.mongoAddr = mongoAddr; + } + + public int getMongoPort() { + return mongoPort; + } + + public void setMongoPort(int mongoPort) { + this.mongoPort = mongoPort; + } + + public String getMongoUserName() { + return mongoUserName; + } + + public void setMongoUserName(String mongoUserName) { + this.mongoUserName = mongoUserName; + } + + public String getMongoPassWord() { + return mongoPassWord; + } + + public void setMongoPassWord(String mongoPassWord) { + this.mongoPassWord = mongoPassWord; + } + + + public String getDataSync() { + return dataSync; + } + + public void setDataSync(String dataSync) { + this.dataSync = dataSync; + } + + + public void load(KeyValue props) { + + properties2Object(props, this); + } + + private void properties2Object(final KeyValue p, final Object object) { + + Method[] methods = object.getClass().getMethods(); + for (Method method : methods) { + String mn = method.getName(); + if (mn.startsWith("set")) { + try { + String tmp = mn.substring(4); + String first = mn.substring(3, 4); + + String key = first.toLowerCase() + tmp; + String property = p.getString(key); + if (property != null) { + Class<?>[] pt = method.getParameterTypes(); + if (pt != null && pt.length > 0) { + String cn = pt[0].getSimpleName(); + Object arg; + if (cn.equals("int") || cn.equals("Integer")) { + arg = Integer.parseInt(property); + } else if (cn.equals("long") || cn.equals("Long")) { + arg = Long.parseLong(property); + } else if (cn.equals("double") || cn.equals("Double")) { + arg = Double.parseDouble(property); + } else if (cn.equals("boolean") || cn.equals("Boolean")) { + arg = Boolean.parseBoolean(property); + } else if (cn.equals("float") || cn.equals("Float")) { + arg = Float.parseFloat(property); + } else if (cn.equals("String")) { + arg = property; + } else { + continue; + } + method.invoke(object, arg); + } + } + } catch (Throwable ignored) { + } + } + } + } +} diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java new file mode 100644 index 0000000..9e659c9 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceConnector.java @@ -0,0 +1,61 @@ +package org.apache.connect.mongo.connector; + +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.Task; +import io.openmessaging.connector.api.source.SourceConnector; +import org.apache.connect.mongo.MongoReplicatorConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +public class MongoSourceConnector extends SourceConnector { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + private KeyValue keyValueConfig; + + + @Override + public String verifyAndSetConfig(KeyValue config) { + for (String requestKey : MongoReplicatorConfig.REQUEST_CONFIG) { + if (!config.containsKey(requestKey)) { + return "Request config key: " + requestKey; + } + } + this.keyValueConfig = config; + return ""; + } + + @Override + public void start() { + logger.info("start mongo source connector:{}", keyValueConfig); + } + + @Override + public void stop() { + + } + + @Override + public void pause() { + + } + + @Override + public void resume() { + + } + + @Override + public Class<? extends Task> taskClass() { + return MongoSourceTask.class; + } + + @Override + public List<KeyValue> taskConfigs() { + List<KeyValue> config = new ArrayList<>(); + config.add(this.keyValueConfig); + return config; + } +} diff --git a/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java new file mode 100644 index 0000000..e116cfb --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/connector/MongoSourceTask.java @@ -0,0 +1,148 @@ +package org.apache.connect.mongo.connector; + +import com.alibaba.fastjson.JSONObject; +import io.openmessaging.KeyValue; +import io.openmessaging.connector.api.data.*; +import io.openmessaging.connector.api.source.SourceTask; +import org.apache.connect.mongo.replicator.Constants; +import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.replicator.event.OperationType; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.apache.connect.mongo.replicator.MongoReplicator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class MongoSourceTask extends SourceTask { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + private MongoReplicator mongoReplicator; + + private MongoReplicatorConfig replicatorConfig; + + private String mongoSource; + + @Override + public Collection<SourceDataEntry> poll() { + List<SourceDataEntry> res = new ArrayList<>(); + ReplicationEvent event = mongoReplicator.getQueue().poll(); + if (event == null) { + return new ArrayList<>(); + } + + JSONObject position = position(event); + Schema schema = new Schema(); + schema.setDataSource(event.getDatabaseName()); + schema.setName(event.getCollectionName()); + schema.setFields(new ArrayList<>()); + buildFieleds(schema); + DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema); + dataEntryBuilder.timestamp(System.currentTimeMillis()) + .queue(event.getNamespace()) + .entryType(event.getEntryType()); + + if (event.getOperationType().ordinal() == OperationType.CREATED.ordinal()) { + dataEntryBuilder.putFiled(Constants.CREATED, event.getDocument().toJson()); + dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace()); + } else { + dataEntryBuilder.putFiled(Constants.OPERATIONTYPE, event.getOperationType().name()); + dataEntryBuilder.putFiled(Constants.TIMESTAMP, event.getTimestamp().getValue()); + dataEntryBuilder.putFiled(Constants.VERSION, event.getV()); + dataEntryBuilder.putFiled(Constants.NAMESPACE, event.getNamespace()); + dataEntryBuilder.putFiled(Constants.PATCH, event.getEventData().isPresent() ? JSONObject.toJSONString(event.getEventData().get()) : ""); + dataEntryBuilder.putFiled(Constants.OBJECTID, event.getObjectId().isPresent() ? JSONObject.toJSONString(event.getObjectId().get()) : ""); + } + SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry( + ByteBuffer.wrap(mongoSource.getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap(position.toJSONString().getBytes(StandardCharsets.UTF_8))); + res.add(sourceDataEntry); + return res; + } + + @Override + public void start(KeyValue config) { + try { + replicatorConfig = new MongoReplicatorConfig(); + replicatorConfig.load(config); + mongoReplicator = new MongoReplicator(replicatorConfig); + mongoSource = new StringBuilder() + .append(replicatorConfig.getMongoAddr()) + .append(replicatorConfig.getMongoPort()).toString(); + ByteBuffer position = this.context.positionStorageReader().getPosition(ByteBuffer.wrap( + mongoSource.getBytes())); + + if (position != null && position.array().length > 0) { + String positionJson = new String(position.array(), StandardCharsets.UTF_8); + JSONObject jsonObject = JSONObject.parseObject(positionJson); + replicatorConfig.setPositionTimeStamp(jsonObject.getLongValue("timeStamp")); + replicatorConfig.setPositionInc(jsonObject.getIntValue("inc")); + } else { + replicatorConfig.setDataSync(Constants.INITIAL); + } + mongoReplicator.start(); + }catch (Throwable throwable) { + logger.info("task start error", throwable); + }finally { + stop(); + } + + + } + + @Override + public void stop() { + logger.info("shut down....."); + mongoReplicator.shutdown(); + } + + @Override + public void pause() { + mongoReplicator.pause(); + } + + @Override + public void resume() { + mongoReplicator.resume(); + } + + private void buildFieleds(Schema schema) { + Field op = new Field(0, Constants.OPERATIONTYPE, FieldType.STRING); + schema.getFields().add(op); + Field time = new Field(1, Constants.TIMESTAMP, FieldType.INT64); + schema.getFields().add(time); + Field v = new Field(2, Constants.VERSION, FieldType.INT32); + schema.getFields().add(v); + Field namespace = new Field(3, Constants.NAMESPACE, FieldType.STRING); + schema.getFields().add(namespace); + Field operation = new Field(4, Constants.CREATED, FieldType.STRING); + schema.getFields().add(operation); + Field patch = new Field(5, Constants.PATCH, FieldType.STRING); + schema.getFields().add(patch); + Field objectId = new Field(5, Constants.OBJECTID, FieldType.STRING); + schema.getFields().add(objectId); + } + + private JSONObject position(ReplicationEvent event) { + JSONObject jsonObject = new JSONObject(); + switch (event.getOperationType()) { + case CREATED: + jsonObject.put(Constants.POSITION_TIMESTAMP, 0); + jsonObject.put(Constants.POSITION_INC, 0); + jsonObject.put(Constants.INITSYNC, true); + break; + default: + jsonObject.put(Constants.POSITION_TIMESTAMP, 0); + jsonObject.put(Constants.POSITION_INC, 0); + jsonObject.put(Constants.INITSYNC, true); + break; + } + return jsonObject; + + } +} diff --git a/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java new file mode 100644 index 0000000..018418c --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/initsync/CollectionMeta.java @@ -0,0 +1,41 @@ +package org.apache.connect.mongo.initsync; + +public class CollectionMeta { + + private String databaseName; + private String collectionName; + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public String getCollectionName() { + return collectionName; + } + + public void setCollectionName(String collectionName) { + this.collectionName = collectionName; + } + + public CollectionMeta(String databaseName, String collectionName) { + + this.databaseName = databaseName; + this.collectionName = collectionName; + } + + public String getNameSpace() { + return databaseName + "." + collectionName; + } + + @Override + public String toString() { + return "CollectionMeta{" + + "databaseName='" + databaseName + '\'' + + ", collectionName='" + collectionName + '\'' + + '}'; + } +} diff --git a/src/main/java/org/apache/connect/mongo/initsync/InitSync.java b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java new file mode 100644 index 0000000..4b3a238 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/initsync/InitSync.java @@ -0,0 +1,146 @@ +package org.apache.connect.mongo.initsync; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoIterable; +import org.apache.connect.mongo.replicator.event.DocumentConvertEvent; +import org.apache.connect.mongo.replicator.event.OperationType; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.apache.connect.mongo.replicator.Filter; +import org.apache.connect.mongo.replicator.MongoReplicator; +import org.apache.connect.mongo.MongoReplicatorConfig; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class InitSync { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + private MongoReplicatorConfig mongoReplicatorConfig; + private ExecutorService copyExecutor; + private MongoClient mongoClient; + private Filter filter; + private int copyThreadCount; + private Set<String> interestDataBases; + private Set<CollectionMeta> interestCollections; + private CountDownLatch countDownLatch; + private MongoReplicator mongoReplicator; + + public InitSync(MongoReplicatorConfig mongoReplicatorConfig, MongoClient mongoClient, Filter filter, MongoReplicator mongoReplicator) { + this.mongoReplicatorConfig = mongoReplicatorConfig; + this.mongoClient = mongoClient; + this.filter = filter; + this.mongoReplicator = mongoReplicator; + init(); + } + + public void start() { + for (CollectionMeta collectionMeta : interestCollections) { + copyExecutor.submit(new CopyRunner(mongoClient, countDownLatch, collectionMeta, mongoReplicator)); + } + try { + countDownLatch.await(); + } catch (Exception e) { + } finally { + copyExecutor.shutdown(); + } + } + + private void init() { + interestDataBases = getInterestDataBase(); + interestCollections = getInterestCollection(interestDataBases); + copyThreadCount = Math.min(interestCollections.size(), mongoReplicatorConfig.getCopyThread()); + copyExecutor = Executors.newFixedThreadPool(copyThreadCount, new ThreadFactory() { + + AtomicInteger threads = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "copy_collection_thread_" + threads.incrementAndGet()); + } + }); + countDownLatch = new CountDownLatch(interestCollections.size()); + } + + private Set<CollectionMeta> getInterestCollection(Set<String> interestDataBases) { + Set<CollectionMeta> res = new HashSet<>(); + for (String interestDataBase : interestDataBases) { + MongoIterable<String> collectionNames = mongoClient.getDatabase(interestDataBase).listCollectionNames(); + MongoCursor<String> iterator = collectionNames.iterator(); + while (iterator.hasNext()) { + String collectionName = iterator.next(); + if (filter.filterCollectionName(collectionName)) { + CollectionMeta collectionMeta = new CollectionMeta(interestDataBase, collectionName); + res.add(collectionMeta); + } + } + } + + return res; + + } + + private Set<String> getInterestDataBase() { + Set<String> res = new HashSet<>(); + MongoIterable<String> databaseNames = mongoClient.listDatabaseNames(); + MongoCursor<String> iterator = databaseNames.iterator(); + while (iterator.hasNext()) { + String dataBaseName = iterator.next(); + if (filter.filterDatabaseName(dataBaseName)) { + res.add(dataBaseName); + } + } + + return res; + } + + class CopyRunner implements Runnable { + + private MongoClient mongoClient; + private CountDownLatch countDownLatch; + private CollectionMeta collectionMeta; + private MongoReplicator mongoReplicator; + + public CopyRunner(MongoClient mongoClient, CountDownLatch countDownLatch, CollectionMeta collectionMeta, MongoReplicator mongoReplicator) { + this.mongoClient = mongoClient; + this.countDownLatch = countDownLatch; + this.collectionMeta = collectionMeta; + this.mongoReplicator = mongoReplicator; + } + + @Override + public void run() { + + try { + + MongoCursor<Document> mongoCursor = mongoClient.getDatabase(collectionMeta.getDatabaseName()) + .getCollection(collectionMeta.getCollectionName()) + .find() + .batchSize(200) + .iterator(); + + while (mongoReplicator.isRuning() && mongoCursor.hasNext()) { + Document document = mongoCursor.next(); + ReplicationEvent event = DocumentConvertEvent.convert(document); + event.setOperationType(OperationType.CREATED); + event.setNamespace(collectionMeta.getNameSpace()); + mongoReplicator.publishEvent(event); + } + } finally { + countDownLatch.countDown(); + } + logger.info("database:{}, collection:{}, init sync done", collectionMeta.getDatabaseName(), collectionMeta.getCollectionName()); + } + } + +} + + + + diff --git a/src/main/java/org/apache/connect/mongo/replicator/Filter.java b/src/main/java/org/apache/connect/mongo/replicator/Filter.java new file mode 100644 index 0000000..05dec54 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/Filter.java @@ -0,0 +1,74 @@ +package org.apache.connect.mongo.replicator; + + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.replicator.event.OperationType; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; + +import java.util.function.Function; + +public class Filter { + + private Function<String, Boolean> dbFilter; + private Function<String, Boolean> collectionFilter; + private Function<OperationType, Boolean> noopFilter; + + + public Filter(MongoReplicatorConfig mongoReplicatorConfig) { + if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestDB())) { + dbFilter = (dataBaseName) -> { + if (StringUtils.isBlank(dataBaseName)) { + return true; + } + String interestDB = mongoReplicatorConfig.getInterestDB(); + String[] db = StringUtils.split(interestDB, ","); + if (ArrayUtils.contains(db, dataBaseName)) { + return true; + } + + return false; + }; + } else { + dbFilter = (dataBaseName) -> true; + } + + + if (StringUtils.isNotBlank(mongoReplicatorConfig.getInterestCollection())) { + collectionFilter = (collectionName) -> { + if (StringUtils.isBlank(collectionName)) { + return true; + } + + String interestCollection = mongoReplicatorConfig.getInterestCollection(); + String[] coll = StringUtils.split(interestCollection, ","); + if (ArrayUtils.contains(coll, collectionName)) { + return true; + } + return false; + }; + } else { + collectionFilter = (collectionName) -> true; + } + + noopFilter = (opeartionType) -> opeartionType.ordinal() != OperationType.NOOP.ordinal(); + } + + + public boolean filterDatabaseName(String dataBaseName) { + return dbFilter.apply(dataBaseName); + } + + + public boolean filterCollectionName(String collectionName) { + return collectionFilter.apply(collectionName); + } + + public boolean filterEvent(ReplicationEvent event) { + return dbFilter.apply(event.getDatabaseName()) + && collectionFilter.apply(event.getCollectionName()) + && noopFilter.apply(event.getOperationType()); + } + +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java new file mode 100644 index 0000000..326ba03 --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/MongoReplicator.java @@ -0,0 +1,152 @@ +package org.apache.connect.mongo.replicator; + +import com.mongodb.*; +import com.mongodb.client.*; +import com.mongodb.client.MongoClient; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; +import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.connect.mongo.replicator.Constants.*; + + +public class MongoReplicator { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + private AtomicBoolean running = new AtomicBoolean(); + + private MongoReplicatorConfig mongoReplicatorConfig; + + private MongoClientSettings clientSettings; + + private ConnectionString connectionString; + + private MongoClient mongoClient; + + private BlockingQueue<ReplicationEvent> queue = new LinkedBlockingQueue<>(); + + private Filter filter; + + private ExecutorService executorService; + + private volatile boolean pause = false; + + public MongoReplicator(MongoReplicatorConfig mongoReplicatorConfig) { + this.mongoReplicatorConfig = mongoReplicatorConfig; + this.filter = new Filter(mongoReplicatorConfig); + this.executorService = Executors.newSingleThreadExecutor((r) ->new Thread(r, "real_time_replica_thread")); + + buildConnectionString(); + } + + public void start() { + + try { + if (!running.compareAndSet(false, true)) { + logger.info("the java mongo replica already start"); + return; + } + + this.clientSettings = MongoClientSettings.builder().applicationName(APPLICATION_NAME) + .applyConnectionString(connectionString) + .retryWrites(true) + .build(); + this.mongoClient = MongoClients.create(clientSettings); + this.isReplicaMongo(); + executorService.submit(new ReplicatorTask(this, mongoClient, mongoReplicatorConfig, filter)); + }catch (Exception e) { + logger.info("start replicator error", e); + }finally { + shutdown(); + } + + } + + + private void buildConnectionString() { + checkConfig(); + StringBuilder sb = new StringBuilder(); + sb.append("mongodb://"); + if (StringUtils.isNotBlank(mongoReplicatorConfig.getMongoUserName()) + && StringUtils.isNotBlank(mongoReplicatorConfig.getMongoPassWord())) { + sb.append(mongoReplicatorConfig.getMongoUserName()); + sb.append(":"); + sb.append(mongoReplicatorConfig.getMongoPassWord()); + sb.append("@"); + + } + sb.append(mongoReplicatorConfig.getMongoAddr()); + sb.append(":"); + sb.append(mongoReplicatorConfig.getMongoPort()); + + this.connectionString = new ConnectionString(sb.toString()); + } + + private void checkConfig() { + Validate.notBlank(mongoReplicatorConfig.getMongoAddr(), "mongo url is blank"); + Validate.isTrue(mongoReplicatorConfig.getMongoPort() > 0 && mongoReplicatorConfig.getMongoPort() < 65535, "mongo port should >0 and <65535"); + + } + + private boolean isReplicaMongo() { + MongoDatabase local = mongoClient.getDatabase(MONGO_LOCAL_DATABASE); + MongoIterable<String> collectionNames = local.listCollectionNames(); + for (String collectionName : collectionNames) { + if (MONGO_OPLOG_RS.equals(collectionName)) { + return true; + } + } + this.shutdown(); + throw new IllegalStateException(String.format("url:%s, port:%s is not replica", mongoReplicatorConfig.getMongoAddr(), mongoReplicatorConfig.getMongoPort())); + } + + public void shutdown() { + if (running.compareAndSet(true, false)) { + if (!this.executorService.isShutdown()) { + executorService.shutdown(); + } + if (this.mongoClient != null) { + this.mongoClient.close(); + } + } + + } + + public void publishEvent(ReplicationEvent replicationEvent) { + while (true) { + try { + queue.put(replicationEvent); + break; + } catch (Exception e) { + } + } + } + + + public void pause() { + pause = true; + } + + public void resume() { + pause = false; + } + + public boolean isPause() { + return pause; + } + + public boolean isRuning() { + return running.get(); + } + + public BlockingQueue<ReplicationEvent> getQueue() { + return queue; + } +} diff --git a/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java new file mode 100644 index 0000000..9e4c67b --- /dev/null +++ b/src/main/java/org/apache/connect/mongo/replicator/ReplicatorTask.java @@ -0,0 +1,84 @@ +package org.apache.connect.mongo.replicator; + +import com.mongodb.CursorType; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.model.Filters; +import org.apache.connect.mongo.MongoReplicatorConfig; +import org.apache.connect.mongo.replicator.event.DocumentConvertEvent; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.apache.connect.mongo.initsync.InitSync; +import org.bson.Document; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ReplicatorTask implements Runnable { + + private Logger logger = LoggerFactory.getLogger(this.getClass()); + + private MongoReplicator mongoReplicator; + + private MongoClient mongoClient; + + private MongoReplicatorConfig mongoReplicatorConfig; + + private Filter filter; + + public ReplicatorTask(MongoReplicator mongoReplicator, MongoClient mongoClient, MongoReplicatorConfig mongoReplicatorConfig, Filter filter) { + this.mongoReplicator = mongoReplicator; + this.mongoReplicatorConfig = mongoReplicatorConfig; + this.mongoClient = mongoClient; + this.filter = filter; + } + + @Override + public void run() { + + if (Constants.INITIAL.equals(mongoReplicatorConfig.getDataSync())) { + InitSync initSync = new InitSync(mongoReplicatorConfig, mongoClient, filter, mongoReplicator); + initSync.start(); + } + + MongoDatabase localDataBase = mongoClient.getDatabase(Constants.MONGO_LOCAL_DATABASE); + FindIterable<Document> iterable; + if (mongoReplicatorConfig.getPositionTimeStamp() > 0 && mongoReplicatorConfig.getPositionTimeStamp() < System.currentTimeMillis()) { + iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find( + Filters.gt("ts", mongoReplicatorConfig.getPositionTimeStamp())); + } else { + iterable = localDataBase.getCollection(Constants.MONGO_OPLOG_RS).find(); + } + MongoCursor<Document> cursor = iterable.sort(new Document("$natural", 1)) + .noCursorTimeout(true) + .cursorType(CursorType.TailableAwait) + .batchSize(200) + .iterator(); + + while (mongoReplicator.isRuning()) { + try { + executorCursor(cursor); + Thread.sleep(100); + } catch (Exception e) { + e.printStackTrace(); + } finally { + logger.error("mongoReplicator shutdown....."); + mongoReplicator.shutdown(); + } + } + } + + + private void executorCursor(MongoCursor<Document> cursor) { + while (cursor.hasNext() && !mongoReplicator.isPause()) { + Document document = cursor.next(); + ReplicationEvent event = DocumentConvertEvent.convert(document); + if (filter.filterEvent(event)) { + mongoReplicator.publishEvent(event); + } + } + } + + +} diff --git a/src/test/java/org/apache/connect/mongo/FilterTest.java b/src/test/java/org/apache/connect/mongo/FilterTest.java new file mode 100644 index 0000000..24f36d9 --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/FilterTest.java @@ -0,0 +1,67 @@ +package org.apache.connect.mongo; + +import org.apache.connect.mongo.replicator.Filter; +import org.apache.connect.mongo.replicator.event.OperationType; +import org.apache.connect.mongo.replicator.event.ReplicationEvent; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class FilterTest { + + + private MongoReplicatorConfig config; + + + @Before + public void init() { + config = new MongoReplicatorConfig(); + + } + + @Test + public void testSpecialDb() { + config.setInterestDB("test,admin"); + Filter filter = new Filter(config); + Assert.assertTrue(filter.filterDatabaseName("test")); + Assert.assertFalse(filter.filterDatabaseName("test01")); + } + + + @Test + public void testBlankDb() { + Filter filter = new Filter(config); + Assert.assertTrue(filter.filterDatabaseName("test")); + Assert.assertTrue(filter.filterDatabaseName("test01")); + } + + + @Test + public void testSpecialCollection() { + config.setInterestCollection("test,admin"); + Filter filter = new Filter(config); + Assert.assertTrue(filter.filterCollectionName("test")); + Assert.assertFalse(filter.filterCollectionName("test01")); + } + + + @Test + public void testBlankCollection() { + Filter filter = new Filter(config); + Assert.assertTrue(filter.filterCollectionName("test")); + Assert.assertTrue(filter.filterCollectionName("test01")); + } + + + @Test + public void testFilterEvent() { + Filter filter = new Filter(config); + ReplicationEvent replicationEvent = new ReplicationEvent(); + replicationEvent.setOperationType(OperationType.NOOP); + Assert.assertFalse(filter.filterEvent(replicationEvent)); + replicationEvent.setOperationType(OperationType.DBCOMMAND); + Assert.assertTrue(filter.filterEvent(replicationEvent)); + } + + +} diff --git a/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java new file mode 100644 index 0000000..04e1374 --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/MongoSourceConnectorTest.java @@ -0,0 +1,45 @@ +package org.apache.connect.mongo; + +import io.openmessaging.internal.DefaultKeyValue; +import org.apache.commons.lang3.StringUtils; +import org.apache.connect.mongo.connector.MongoSourceConnector; +import org.apache.connect.mongo.connector.MongoSourceTask; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MongoSourceConnectorTest { + + private MongoSourceConnector mongoSourceConnector; + private DefaultKeyValue keyValue; + + @Before + public void before() { + mongoSourceConnector = new MongoSourceConnector(); + keyValue = new DefaultKeyValue(); + } + + @Test + public void takeClass() { + Assert.assertEquals(mongoSourceConnector.taskClass(), MongoSourceTask.class); + } + + + @Test + public void verifyConfig() { + keyValue.put("mongoAddr", "127.0.0.1"); + String s = mongoSourceConnector.verifyAndSetConfig(keyValue); + Assert.assertTrue(s.contains("Request config key:")); + keyValue.put("mongoPort", 27017); + s = mongoSourceConnector.verifyAndSetConfig(keyValue); + Assert.assertTrue(StringUtils.isBlank(s)); + } + + + + + + + + +} diff --git a/src/test/java/org/apache/connect/mongo/ReplicatorTest.java b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java new file mode 100644 index 0000000..b0319f1 --- /dev/null +++ b/src/test/java/org/apache/connect/mongo/ReplicatorTest.java @@ -0,0 +1,33 @@ +package org.apache.connect.mongo; + +import org.apache.connect.mongo.replicator.MongoReplicator; +import org.junit.Before; +import org.junit.Test; + +public class ReplicatorTest { + + private MongoReplicatorConfig config; + + @Before + public void before() { + config = new MongoReplicatorConfig(); + } + + @Test(expected = IllegalArgumentException.class) + public void testNoPort() { + config.setMongoAddr("127.0.0.1"); + MongoReplicator mongoReplicator = new MongoReplicator(config); + mongoReplicator.start(); + } + + + @Test + public void testNoPort1() { + config.setMongoAddr("127.0.0.1"); + config.setMongoPort(27012); + MongoReplicator mongoReplicator = new MongoReplicator(config); + mongoReplicator.start(); + } + + +}
