This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 06e911d Add a Pulsar IO MongoDB (#3561)
06e911d is described below
commit 06e911d4054daabca84556b80976b7e6ffa4c815
Author: Bruno Bonnin <[email protected]>
AuthorDate: Wed Feb 13 02:56:20 2019 +0100
Add a Pulsar IO MongoDB (#3561)
### Motivation
Provides a builtin MongoDB Connector, in order to ease the storage of JSON
formated message in MongoDB. It's a sink connector.
### Modifications
Add a new sub-module in the `pulsar-io`module.
### Verifying this change
This change added tests and can be verified as follows:
* deploy the connector with configuration file containing the following
fields:
```
configs:
mongoUri: mongodb://hostname:port
database: pulsar
collection: messages
```
* start a mongodb instance
* send messages in the topic declared when deploying the connector
* check in MongoDB if the messages have been stored in the collection
`messages`
---
pulsar-io/mongo/pom.xml | 82 +++++++++
.../org/apache/pulsar/io/mongodb/MongoConfig.java | 109 ++++++++++++
.../org/apache/pulsar/io/mongodb/MongoSink.java | 182 +++++++++++++++++++
.../org/apache/pulsar/io/mongodb/package-info.java | 19 ++
.../resources/META-INF/services/pulsar-io.yaml | 3 +
.../apache/pulsar/io/mongodb/MongoConfigTest.java | 87 +++++++++
.../apache/pulsar/io/mongodb/MongoSinkTest.java | 198 +++++++++++++++++++++
.../org/apache/pulsar/io/mongodb/TestHelper.java | 55 ++++++
.../mongo/src/test/resources/mongoSinkConfig.yaml | 26 +++
pulsar-io/pom.xml | 1 +
10 files changed, 762 insertions(+)
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
new file mode 100644
index 0000000..8968c6a
--- /dev/null
+++ b/pulsar-io/mongo/pom.xml
@@ -0,0 +1,82 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.3.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>pulsar-io-mongo</artifactId>
+ <name>Pulsar IO :: MongoDB</name>
+
+ <properties>
+ <mongo-driver.version>3.8.2</mongo-driver.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mongodb</groupId>
+ <artifactId>mongodb-driver-async</artifactId>
+ <version>${mongo-driver.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>buildtools</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
new file mode 100644
index 0000000..602c105
--- /dev/null
+++
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.*;
+import lombok.experimental.Accessors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Configuration class for the MongoDB Sink Connector.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class MongoConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final int DEFAULT_BATCH_SIZE = 100;
+
+ public static final long DEFAULT_BATCH_TIME_MS = 1000;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The uri of mongodb that the connector connects to" +
+ " (see:
https://docs.mongodb.com/manual/reference/connection-string/)"
+ )
+ private String mongoUri;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The name of the database to which the collection belongs to"
+ )
+ private String database;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The collection name that the connector writes messages to"
+ )
+ private String collection;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "" + DEFAULT_BATCH_SIZE,
+ help = "The batch size of write to the collection"
+ )
+ private int batchSize = DEFAULT_BATCH_SIZE;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "" + DEFAULT_BATCH_TIME_MS,
+ help = "The batch operation interval in milliseconds")
+ private long batchTimeMs = DEFAULT_BATCH_TIME_MS;
+
+
+ public static MongoConfig load(String yamlFile) throws IOException {
+ final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ final MongoConfig cfg = mapper.readValue(new File(yamlFile),
MongoConfig.class);
+
+ return cfg;
+ }
+
+ public static MongoConfig load(Map<String, Object> map) throws IOException
{
+ final ObjectMapper mapper = new ObjectMapper();
+ final MongoConfig cfg = mapper.readValue(new
ObjectMapper().writeValueAsString(map), MongoConfig.class);
+
+ return cfg;
+ }
+
+ public void validate() {
+ if (StringUtils.isEmpty(mongoUri) || StringUtils.isEmpty(database) ||
StringUtils.isEmpty(collection)) {
+ throw new IllegalArgumentException("Required property not set.");
+ }
+
+ Preconditions.checkArgument(batchSize > 0, "batchSize must be a
positive integer.");
+ Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a
positive long.");
+ }
+}
diff --git
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
new file mode 100644
index 0000000..3eb6f6e
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import com.google.common.collect.Lists;
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.async.client.MongoClient;
+import com.mongodb.async.client.MongoClients;
+import com.mongodb.async.client.MongoCollection;
+import com.mongodb.async.client.MongoDatabase;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BSONException;
+import org.bson.Document;
+import org.bson.json.JsonParseException;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * The base class for MongoDB sinks.
+ * Users need to implement extractKeyValue function to use this sink.
+ * This class assumes that the input will be JSON documents.
+ */
+@Connector(
+ name = "mongo",
+ type = IOType.SINK,
+ help = "A sink connector that sends pulsar messages to mongodb",
+ configClass = MongoConfig.class
+)
+@Slf4j
+public class MongoSink implements Sink<byte[]> {
+
+ private MongoConfig mongoConfig;
+
+ private MongoClient mongoClient;
+
+ private MongoCollection<Document> collection;
+
+ private List<Record<byte[]>> incomingList;
+
+ private ScheduledExecutorService flushExecutor;
+
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ log.info("Open MongoDB Sink");
+
+ mongoConfig = MongoConfig.load(config);
+ mongoConfig.validate();
+
+ mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+ final MongoDatabase db =
mongoClient.getDatabase(mongoConfig.getDatabase());
+ collection = db.getCollection(mongoConfig.getCollection());
+
+ incomingList = Lists.newArrayList();
+ flushExecutor = Executors.newScheduledThreadPool(1);
+ flushExecutor.scheduleAtFixedRate(() -> flush(),
+ mongoConfig.getBatchTimeMs(), mongoConfig.getBatchTimeMs(),
TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void write(Record<byte[]> record) {
+ final String recordValue = new String(record.getValue(),
Charset.forName("UTF-8"));
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received record: " + recordValue);
+ }
+
+ int currentSize;
+
+ synchronized (this) {
+ incomingList.add(record);
+ currentSize = incomingList.size();
+ }
+
+ if (currentSize == mongoConfig.getBatchSize()) {
+ flushExecutor.submit(() -> flush());
+ }
+ }
+
+ private void flush() {
+ final List<Document> docsToInsert = new ArrayList<>();
+ final List<Record<byte[]>> recordsToInsert;
+
+ synchronized (this) {
+ if (incomingList.isEmpty()) {
+ return;
+ }
+
+ recordsToInsert = incomingList;
+ incomingList = Lists.newArrayList();
+ }
+
+ final Iterator<Record<byte[]>> iter = recordsToInsert.iterator();
+
+ while (iter.hasNext()) {
+ final Record<byte[]> record = iter.next();
+
+ try {
+ final byte[] docAsBytes = record.getValue();
+ final Document doc = Document.parse(new String(docAsBytes,
Charset.forName("UTF-8")));
+ docsToInsert.add(doc);
+ }
+ catch (JsonParseException | BSONException e) {
+ log.error("Bad message", e);
+ record.fail();
+ iter.remove();
+ }
+ }
+
+ if (docsToInsert.size() > 0) {
+
+ collection.insertMany(docsToInsert, (result, t) -> {
+ final List<Integer> idxToAck = IntStream.range(0,
docsToInsert.size()).boxed().collect(toList());
+ final List<Integer> idxToFail = Lists.newArrayList();
+
+ if (t != null) {
+ log.error("MongoDB insertion error", t);
+
+ if (t instanceof MongoBulkWriteException) {
+ // With this exception, we are aware of the items that
have not been inserted.
+ ((MongoBulkWriteException)
t).getWriteErrors().forEach(err -> {
+ idxToFail.add(err.getIndex());
+ });
+ idxToAck.removeAll(idxToFail);
+ } else {
+ idxToFail.addAll(idxToAck);
+ idxToAck.clear();
+ }
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Nb ack={}, nb fail={}", idxToAck.size(),
idxToFail.size());
+ }
+
+ idxToAck.forEach(idx -> recordsToInsert.get(idx).ack());
+ idxToFail.forEach(idx -> recordsToInsert.get(idx).fail());
+ });
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (flushExecutor != null) {
+ flushExecutor.shutdown();
+ }
+
+ if (mongoClient != null) {
+ mongoClient.close();
+ }
+ }
+}
diff --git
a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/package-info.java
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/package-info.java
new file mode 100644
index 0000000..df14171
--- /dev/null
+++
b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
\ No newline at end of file
diff --git
a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..476baed
--- /dev/null
+++ b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,3 @@
+name: mongo
+description: Writes data into MongoDB
+sinkClass: org.apache.pulsar.io.mongodb.MongoSink
diff --git
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
new file mode 100644
index 0000000..9f98ee4
--- /dev/null
+++
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+public class MongoConfigTest {
+
+ private static File getFile(String fileName) {
+ return new
File(MongoConfigTest.class.getClassLoader().getResource(fileName).getFile());
+ }
+
+ @Test
+ public void testMap() throws IOException {
+ final Map<String, Object> map = TestHelper.createMap(true);
+ final MongoConfig cfg = MongoConfig.load(map);
+
+ assertEquals(cfg.getMongoUri(), TestHelper.URI);
+ assertEquals(cfg.getDatabase(), TestHelper.DB);
+ assertEquals(cfg.getCollection(), TestHelper.COLL);
+ assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Required property not set.")
+ public void testBadMap() throws IOException {
+ final Map<String, Object> map = TestHelper.createMap(false);
+ final MongoConfig cfg = MongoConfig.load(map);
+
+ cfg.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "batchSize must be a positive
integer.")
+ public void testBadBatchSize() throws IOException {
+ final Map<String, Object> map = TestHelper.createMap(true);
+ map.put("batchSize", 0);
+ final MongoConfig cfg = MongoConfig.load(map);
+
+ cfg.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "batchTimeMs must be a positive
long.")
+ public void testBadBatchTime() throws IOException {
+ final Map<String, Object> map = TestHelper.createMap(true);
+ map.put("batchTimeMs", 0);
+ final MongoConfig cfg = MongoConfig.load(map);
+
+ cfg.validate();
+ }
+
+ @Test
+ public void testYaml() throws IOException {
+ final File yaml = getFile("mongoSinkConfig.yaml");
+ final MongoConfig cfg = MongoConfig.load(yaml.getAbsolutePath());
+
+ assertEquals(cfg.getMongoUri(), TestHelper.URI);
+ assertEquals(cfg.getDatabase(), TestHelper.DB);
+ assertEquals(cfg.getCollection(), TestHelper.COLL);
+ assertEquals(cfg.getBatchSize(), TestHelper.BATCH_SIZE);
+ assertEquals(cfg.getBatchTimeMs(), TestHelper.BATCH_TIME);
+ }
+}
diff --git
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
new file mode 100644
index 0000000..c941708
--- /dev/null
+++
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import com.mongodb.MongoBulkWriteException;
+import com.mongodb.async.SingleResultCallback;
+import com.mongodb.async.client.MongoClient;
+import com.mongodb.async.client.MongoClients;
+import com.mongodb.async.client.MongoCollection;
+import com.mongodb.async.client.MongoDatabase;
+import com.mongodb.bulk.BulkWriteError;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.bson.BsonDocument;
+import org.mockito.Mock;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+@PrepareForTest(MongoClients.class)
+@PowerMockIgnore({"org.apache.logging.log4j.*"})
+public class MongoSinkTest {
+
+ @Mock
+ private Record<byte[]> mockRecord;
+
+ @Mock
+ private SinkContext mockSinkContext;
+
+ @Mock
+ private MongoClient mockMongoClient;
+
+ @Mock
+ private MongoDatabase mockMongoDb;
+
+ @Mock
+ private MongoCollection mockMongoColl;
+
+ private MongoSink sink;
+
+ private Map<String, Object> map;
+
+
+ @ObjectFactory
+ public IObjectFactory getObjectFactory() {
+ return new org.powermock.modules.testng.PowerMockObjectFactory();
+ }
+
+ @BeforeMethod
+ public void setUp() {
+ sink = new MongoSink();
+ map = TestHelper.createMap(true);
+
+ mockRecord = mock(Record.class);
+ mockSinkContext = mock(SinkContext.class);
+ mockMongoClient = mock(MongoClient.class);
+ mockMongoDb = mock(MongoDatabase.class);
+ mockMongoColl = mock(MongoCollection.class);
+
+ PowerMockito.mockStatic(MongoClients.class);
+
+ when(MongoClients.create(anyString())).thenReturn(mockMongoClient);
+ when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
+ when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
+ }
+
+ private void initContext(boolean throwBulkError) {
+
when(mockRecord.getValue()).thenReturn("{\"hello\":\"pulsar\"}".getBytes());
+
+ doAnswer((invocation) -> {
+ SingleResultCallback cb = invocation.getArgumentAt(1,
SingleResultCallback.class);
+ MongoBulkWriteException exc = null;
+
+ if (throwBulkError) {
+ List<BulkWriteError > writeErrors = Arrays.asList(
+ new BulkWriteError(0, "error", new BsonDocument(), 1));
+ exc = new MongoBulkWriteException(null, writeErrors, null,
null);
+ }
+
+ cb.onResult(null, exc);
+ return null;
+ }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+ }
+
+ private void initFailContext(String msg) {
+ when(mockRecord.getValue()).thenReturn(msg.getBytes());
+
+ doAnswer((invocation) -> {
+ SingleResultCallback cb = invocation.getArgumentAt(1,
SingleResultCallback.class);
+ cb.onResult(null, new Exception("Oops"));
+ return null;
+ }).when(mockMongoColl).insertMany(anyObject(), anyObject());
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ sink.close();
+ verify(mockMongoClient, times(1)).close();
+ }
+
+ @Test
+ public void testOpen() throws Exception {
+ sink.open(map, mockSinkContext);
+ }
+
+ @Test
+ public void testWriteNullMessage() throws Exception {
+ when(mockRecord.getValue()).thenReturn("".getBytes());
+
+ sink.open(map, mockSinkContext);
+ sink.write(mockRecord);
+
+ Thread.sleep(1000);
+
+ verify(mockRecord, times(1)).fail();
+ }
+
+ @Test
+ public void testWriteGoodMessage() throws Exception {
+ initContext(false);
+
+ sink.open(map, mockSinkContext);
+ sink.write(mockRecord);
+
+ Thread.sleep(1000);
+
+ verify(mockRecord, times(1)).ack();
+ }
+
+ @Test
+ public void testWriteMultipleMessages() throws Exception {
+ initContext(true);
+
+ sink.open(map, mockSinkContext);
+ sink.write(mockRecord);
+ sink.write(mockRecord);
+ sink.write(mockRecord);
+
+ Thread.sleep(1000);
+
+ verify(mockRecord, times(2)).ack();
+ verify(mockRecord, times(1)).fail();
+ }
+
+ @Test
+ public void testWriteWithError() throws Exception {
+ initFailContext("{\"hello\":\"pulsar\"}");
+
+ sink.open(map, mockSinkContext);
+ sink.write(mockRecord);
+
+ Thread.sleep(1000);
+
+ verify(mockRecord, times(1)).fail();
+ }
+
+ @Test
+ public void testWriteBadMessage() throws Exception {
+ initFailContext("Oops");
+
+ sink.open(map, mockSinkContext);
+ sink.write(mockRecord);
+
+ Thread.sleep(1000);
+
+ verify(mockRecord, times(1)).fail();
+ }
+}
diff --git
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
new file mode 100644
index 0000000..82a0744
--- /dev/null
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/TestHelper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public final class TestHelper {
+
+ public static final String URI = "mongodb://localhost";
+
+ public static final String DB = "pulsar";
+
+ public static final String COLL = "messages";
+
+ public static final int BATCH_SIZE = 2;
+
+ public static final int BATCH_TIME = 500;
+
+
+ public static Map<String, Object> createMap(boolean full) {
+ final Map<String, Object> map = new HashMap<>();
+ map.put("mongoUri", URI);
+ map.put("database", DB);
+
+ if (full) {
+ map.put("collection", COLL);
+ map.put("batchSize", BATCH_SIZE);
+ map.put("batchTimeMs", BATCH_TIME);
+ }
+
+ return map;
+ }
+
+ private TestHelper() {
+
+ }
+}
diff --git a/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
b/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
new file mode 100644
index 0000000..f7a9ea2
--- /dev/null
+++ b/pulsar-io/mongo/src/test/resources/mongoSinkConfig.yaml
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+ "mongoUri": "mongodb://localhost",
+ "database": "pulsar",
+ "collection": "messages",
+ "batchSize": 2,
+ "batchTimeMs": 500
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index fb88341..4b69d59 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -51,6 +51,7 @@
<module>file</module>
<module>netty</module>
<module>hbase</module>
+ <module>mongo</module>
</modules>
</project>