adopts docker testing for streams-persist-mongo
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/60139e59 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/60139e59 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/60139e59 Branch: refs/heads/master Commit: 60139e59c6f33f91281d3316d65ef8e788e6d85c Parents: cdb8d6b Author: Steve Blackmon @steveblackmon <[email protected]> Authored: Fri Oct 7 14:32:43 2016 -0500 Committer: Steve Blackmon @steveblackmon <[email protected]> Committed: Fri Oct 7 14:32:43 2016 -0500 ---------------------------------------------------------------------- .../elasticsearch.properties | 6 - streams-contrib/streams-persist-mongo/pom.xml | 107 ++++++++++++---- .../streams/mongo/MongoPersistReader.java | 40 +++--- .../streams/mongo/MongoPersistWriter.java | 38 +++--- .../streams/mongo/test/MongoPersistIT.java | 122 +++++++++++++++++++ .../streams/mongo/test/TestMongoPersist.java | 118 ------------------ .../src/test/resources/MongoPersistIT.conf | 6 + 7 files changed, 257 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/60139e59/streams-contrib/streams-persist-elasticsearch/elasticsearch.properties ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/elasticsearch.properties b/streams-contrib/streams-persist-elasticsearch/elasticsearch.properties deleted file mode 100644 index 7df2e97..0000000 --- a/streams-contrib/streams-persist-elasticsearch/elasticsearch.properties +++ /dev/null @@ -1,6 +0,0 @@ -#Docker ports -#Tue Oct 04 23:03:11 CDT 2016 -es.http.host=192.168.99.100 -es.tcp.host=192.168.99.100 -es.http.port=32769 -es.tcp.port=32768 http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/60139e59/streams-contrib/streams-persist-mongo/pom.xml ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/pom.xml b/streams-contrib/streams-persist-mongo/pom.xml index 4a5d6dc..788d70a 100644 --- a/streams-contrib/streams-persist-mongo/pom.xml +++ b/streams-contrib/streams-persist-mongo/pom.xml @@ -30,7 +30,7 @@ <description>Mongo Module</description> <properties> - <mongo-driver.version>2.13.0</mongo-driver.version> + <mongo-driver.version>3.3.0</mongo-driver.version> </properties> <dependencies> @@ -74,31 +74,11 @@ <optional>true</optional> </dependency> <dependency> - <groupId>com.github.fakemongo</groupId> - <artifactId>fongo</artifactId> - <version>1.6.0</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <version>1.9.5</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-module-junit4</artifactId> - </dependency> - <dependency> - <groupId>org.powermock</groupId> - <artifactId>powermock-api-mockito</artifactId> - </dependency> - <dependency> <groupId>org.apache.streams</groupId> <artifactId>streams-testing</artifactId> <version>${project.version}</version> @@ -167,6 +147,91 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-failsafe-plugin</artifactId> + <version>${failsafe.plugin.version}</version> + <configuration> + <!-- Run integration test suite rather than individual tests. --> + <excludes> + <exclude>**/*Test.java</exclude> + <exclude>**/*Tests.java</exclude> + </excludes> + <includes> + <include>**/*IT.java</include> + </includes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <includes> + <include>**/*.conf</include> + <include>**/*.json</include> + <include>**/*.class</include> + </includes> + </configuration> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> </plugins> </build> + + <profiles> + <profile> + <id>dockerITs</id> + <activation> + <activeByDefault>false</activeByDefault> + <property> + <name>skipITs</name> + <value>false</value> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + <version>${docker.plugin.version}</version> + <configuration combine.self="override"> + <watchInterval>500</watchInterval> + <logDate>default</logDate> + <verbose>true</verbose> + <autoPull>on</autoPull> + <images> + <image> + <name>mongo:3.2.0</name> + <alias>mongo</alias> + <run> + <namingStrategy>none</namingStrategy> + <ports> + <port>${mongo.tcp.host}:${mongo.tcp.port}:27017</port> + </ports> + <portPropertyFile>mongo.properties</portPropertyFile> + <log> + <enabled>true</enabled> + <date>default</date> + <color>cyan</color> + </log> + </run> + <watch> + <mode>none</mode> + </watch> + </image> + </images> + </configuration> + + </plugin> + + </plugins> + </build> + + </profile> + </profiles> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/60139e59/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java index ab03f22..9772f95 100644 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java +++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java @@ -61,7 +61,8 @@ public class MongoPersistReader implements StreamsPersistReader { private MongoConfiguration config; private MongoPersistReaderTask readerTask; - protected DB client; + protected MongoClient client; + protected DB db; protected DBCollection collection; protected DBCursor cursor; @@ -95,13 +96,13 @@ public class MongoPersistReader implements StreamsPersistReader { public void stop() { - try { - client.cleanCursors(true); - client.requestDone(); - } catch (Exception e) { - } finally { - client.requestDone(); - } +// try { +// client.st +// client.requestDone(); +// } catch (Exception e) { +// } finally { +// client.requestDone(); +// } } @Override @@ -155,22 +156,23 @@ public class MongoPersistReader implements StreamsPersistReader { private synchronized void connectToMongo() { - try { - client = new MongoClient(config.getHost(), config.getPort().intValue()).getDB(config.getDb()); - } catch (UnknownHostException e) { - e.printStackTrace(); - return; + ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue()); + + if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { + MongoCredential credential = + MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray()); + client = new MongoClient(serverAddress, Lists.<MongoCredential>newArrayList(credential)); + } else { + client = new MongoClient(serverAddress); } - if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) - client.authenticate(config.getUser(), config.getPassword().toCharArray()); + db = client.getDB(config.getDb()); - if (!client.collectionExists(config.getCollection())) { - client.createCollection(config.getCollection(), null); + if (!db.collectionExists(config.getCollection())) { + db.createCollection(config.getCollection(), null); } - ; - collection = client.getCollection(config.getCollection()); + collection = db.getCollection(config.getCollection()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/60139e59/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java index edd8ce5..4938dcc 100644 --- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java +++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java @@ -27,18 +27,22 @@ import com.mongodb.DBAddress; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; import com.mongodb.util.JSON; import com.typesafe.config.Config; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; +import org.bson.types.ObjectId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Queue; import java.util.Random; @@ -63,8 +67,8 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { private MongoConfiguration config; - protected DB client; - protected DBAddress dbaddress; + protected MongoClient client; + protected DB db; protected DBCollection collection; protected List<DBObject> insertBatch = Lists.newArrayList(); @@ -116,8 +120,8 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { } public synchronized void close() throws IOException { - client.cleanCursors(true); - backgroundFlushTask.shutdownNow(); +// client.cleanCursors(true); +// backgroundFlushTask.shutdownNow(); } public void start() { @@ -223,7 +227,6 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { ObjectNode node = mapper.valueToTree(streamsDatum.getDocument()); dbObject = (DBObject) JSON.parse(node.toString()); } catch (Exception e) { - e.printStackTrace(); LOGGER.error("Unsupported type: " + streamsDatum.getDocument().getClass(), e); } } @@ -232,21 +235,24 @@ public class MongoPersistWriter implements StreamsPersistWriter, Runnable { private synchronized void connectToMongo() { - try { - client = new MongoClient(config.getHost(), config.getPort().intValue()).getDB(config.getDb()); - } catch (UnknownHostException e) { - e.printStackTrace(); - return; + ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue()); + + if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) { + MongoCredential credential = + MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray()); + client = new MongoClient(serverAddress, Lists.<MongoCredential>newArrayList(credential)); + } else { + client = new MongoClient(serverAddress); } - if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) - client.authenticate(config.getUser(), config.getPassword().toCharArray()); + db = client.getDB(config.getDb()); - if (!client.collectionExists(config.getCollection())) { - client.createCollection(config.getCollection(), null); + if (!db.collectionExists(config.getCollection())) { + db.createCollection(config.getCollection(), null); } - ; - collection = client.getCollection(config.getCollection()); + collection = db.getCollection(config.getCollection()); } + + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/60139e59/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java new file mode 100644 index 0000000..6860b1a --- /dev/null +++ b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.streams.mongo.test; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.mongodb.MongoClient; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; +import org.apache.streams.config.ComponentConfigurator; +import org.apache.streams.config.StreamsConfiguration; +import org.apache.streams.config.StreamsConfigurator; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsResultSet; +import org.apache.streams.jackson.StreamsJacksonMapper; +import org.apache.streams.mongo.MongoConfiguration; +import org.apache.streams.mongo.MongoPersistReader; +import org.apache.streams.mongo.MongoPersistWriter; +import org.apache.streams.pojo.json.Activity; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +/** + * Test writing documents + */ +public class MongoPersistIT { + + private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistIT.class); + + ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); + + MongoConfiguration testConfiguration; + + int count = 0; + + @Before + public void setup() throws Exception { + + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/MongoPersistIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Properties mongo_properties = new Properties(); + InputStream mongo_stream = new FileInputStream("mongo.properties"); + mongo_properties.load(mongo_stream); + Config mongoProps = ConfigFactory.parseProperties(mongo_properties); + Config typesafe = testResourceConfig.withFallback(mongoProps).withFallback(reference).resolve(); + StreamsConfiguration streams = StreamsConfigurator.detectConfiguration(typesafe); + testConfiguration = new ComponentConfigurator<>(MongoConfiguration.class).detectConfiguration(typesafe, "mongo"); + + } + + @Test + public void testMongoPersist() throws Exception { + + MongoPersistWriter writer = new MongoPersistWriter(testConfiguration); + + writer.prepare(null); + + InputStream testActivityFolderStream = MongoPersistIT.class.getClassLoader() + .getResourceAsStream("activities"); + List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); + + for( String file : files) { + LOGGER.info("File: " + file ); + InputStream testActivityFileStream = MongoPersistIT.class.getClassLoader() + .getResourceAsStream("activities/" + file); + Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); + activity.getAdditionalProperties().remove("$license"); + StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); + writer.write( datum ); + LOGGER.info("Wrote: " + activity.getVerb() ); + count++; + } + + LOGGER.info("Total Written: {}", count ); + + assertEquals( 89, count ); + + writer.cleanUp(); + + MongoPersistReader reader = new MongoPersistReader(testConfiguration); + + reader.prepare(null); + + StreamsResultSet resultSet = reader.readAll(); + + LOGGER.info("Total Read: {}", resultSet.size() ); + + assertEquals( 89, resultSet.size() ); + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/60139e59/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/TestMongoPersist.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/TestMongoPersist.java b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/TestMongoPersist.java deleted file mode 100644 index d0733e8..0000000 --- a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/TestMongoPersist.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.streams.mongo.test; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.github.fakemongo.Fongo; -import com.mongodb.DB; -import com.mongodb.DBAddress; -import com.mongodb.MongoClient; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.IOUtils; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsResultSet; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.apache.streams.mongo.MongoConfiguration; -import org.apache.streams.mongo.MongoPersistReader; -import org.apache.streams.mongo.MongoPersistWriter; -import org.apache.streams.pojo.json.Activity; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.InputStream; -import java.util.List; - -/** - * Test copying documents between two indexes on same cluster - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({MongoPersistReader.class, MongoPersistWriter.class, MongoClient.class, DB.class}) -public class TestMongoPersist { - - private final static Logger LOGGER = LoggerFactory.getLogger(TestMongoPersist.class); - - ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); - - MongoClient mockClient; - - Fongo fongo; - DB mockDB; - - int count = 0; - - @Before - public void setup() { - fongo = new Fongo("testmongo"); - mockDB = fongo.getDB("test"); - - this.mockClient = PowerMockito.mock(MongoClient.class); - - PowerMockito.when(mockClient.getDB(Mockito.anyString())) - .thenReturn(mockDB); - - try { - PowerMockito.whenNew(MongoClient.class).withAnyArguments().thenReturn(mockClient); - } catch (Exception e) {} - - } - - @Test - public void testMongoPersist() throws Exception { - - MongoConfiguration mongoConfiguration = new MongoConfiguration().withHost("localhost").withDb("test").withPort(37017l).withCollection("activities"); - - MongoPersistWriter writer = new MongoPersistWriter(mongoConfiguration); - - writer.prepare(null); - - InputStream testActivityFolderStream = TestMongoPersist.class.getClassLoader() - .getResourceAsStream("activities"); - List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8); - - for( String file : files) { - LOGGER.info("File: " + file ); - InputStream testActivityFileStream = TestMongoPersist.class.getClassLoader() - .getResourceAsStream("activities/" + file); - Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class); - activity.getAdditionalProperties().remove("$license"); - StreamsDatum datum = new StreamsDatum(activity, activity.getVerb()); - writer.write( datum ); - LOGGER.info("Wrote: " + activity.getVerb() ); - count++; - } - - writer.cleanUp(); - - MongoPersistReader reader = new MongoPersistReader(mongoConfiguration); - - reader.prepare(null); - - StreamsResultSet resultSet = reader.readAll(); - - assert( resultSet.size() == count); - - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/60139e59/streams-contrib/streams-persist-mongo/src/test/resources/MongoPersistIT.conf ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-mongo/src/test/resources/MongoPersistIT.conf b/streams-contrib/streams-persist-mongo/src/test/resources/MongoPersistIT.conf new file mode 100644 index 0000000..5754f35 --- /dev/null +++ b/streams-contrib/streams-persist-mongo/src/test/resources/MongoPersistIT.conf @@ -0,0 +1,6 @@ +mongo { + host = ${mongo.tcp.host} + port = ${mongo.tcp.port} + db = "mongo_persist_it" + collection = "activity" +} \ No newline at end of file
