Repository: incubator-rya Updated Branches: refs/heads/master 853e0eea7 -> b372ebcdb
RYA-393 Fixed MongoDB DAO Batch Writer from dropping statements. Closes #236 Fixed secondary indexers so they flush when the DAO flushes. Indexers now close when DAO closes. Fixed daemon thread in MongoDB batch writer. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/b372ebcd Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/b372ebcd Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/b372ebcd Branch: refs/heads/master Commit: b372ebcdb43d19315b7d430ae55fedf45654245c Parents: 853e0ee Author: eric.white <[email protected]> Authored: Wed Oct 4 11:21:06 2017 -0400 Committer: Aaron Mihalik <[email protected]> Committed: Tue Oct 24 09:39:13 2017 -0400 ---------------------------------------------------------------------- .../org/apache/rya/accumulo/AccumuloRyaDAO.java | 11 ++ .../org/apache/rya/mongodb/MongoDBRyaDAO.java | 18 ++ .../rya/mongodb/batch/MongoDbBatchWriter.java | 11 +- .../batch/collection/MongoCollectionType.java | 3 +- .../rya/mongodb/MongoDBRyaBatchWriterIT.java | 168 +++++++++++++++++++ .../org/apache/rya/mongodb/MongoDBRyaDAOIT.java | 8 +- .../org/apache/rya/mongodb/MongoTestBase.java | 17 ++ .../indexing/mongodb/AbstractMongoIndexer.java | 6 + 8 files changed, 239 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java index 8c99e44..a8350d9 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java @@ -472,11 +472,22 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName public void flush() throws RyaDAOException { try { mt_bw.flush(); + flushIndexers(); } catch (final MutationsRejectedException e) { throw new RyaDAOException(e); } } + private void flushIndexers() throws RyaDAOException { + for (final AccumuloIndexer indexer : secondaryIndexers) { + try { + indexer.flush(); + } catch (final IOException e) { + logger.error("Error flushing data in indexer: " + indexer.getClass().getSimpleName(), e); + } + } + } + protected String[] getTables() { // core tables final List<String> tableNames = Lists.newArrayList( http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java index fe0f6f9..d263b9c 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java @@ -171,6 +171,13 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ } catch (final MongoDbBatchWriterException e) { throw new RyaDAOException("Error shutting down MongoDB batch writer", e); } + for(final MongoSecondaryIndex indexer : secondaryIndexers) { + try { + indexer.close(); + } catch (final IOException e) { + log.error("Error closing indexer: " + indexer.getClass().getSimpleName(), e); + } + } if (mongoClient != null) { mongoClient.close(); } @@ -314,8 +321,19 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ public void flush() throws RyaDAOException { try { mongoDbBatchWriter.flush(); + flushIndexers(); } catch (final MongoDbBatchWriterException e) { throw new RyaDAOException("Error flushing data.", e); } } + + private void flushIndexers() throws RyaDAOException { + for (final MongoSecondaryIndex indexer : secondaryIndexers) { + try { + indexer.flush(); + } catch (final IOException e) { + log.error("Error flushing data in indexer: " + indexer.getClass().getSimpleName(), e); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java index 2f52b5c..b609276 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java @@ -35,6 +35,8 @@ import org.apache.log4j.Logger; import org.apache.rya.mongodb.batch.collection.CollectionType; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.mongodb.DuplicateKeyException; +import com.mongodb.MongoBulkWriteException; /** * Handles batch writing MongoDB statement objects to the repository. It takes @@ -95,7 +97,6 @@ public class MongoDbBatchWriter<T> { private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder() .setNameFormat("Queue Full Checker Thread - %d") - .setDaemon(true) .build(); /** @@ -218,6 +219,14 @@ public class MongoDbBatchWriter<T> { if (!batch.isEmpty()) { collectionType.insertMany(batch); } + } catch (final DuplicateKeyException e) { + log.warn(e); // Suppress the stack trace so log doesn't get flooded. + } catch (final MongoBulkWriteException e) { + if (e.getMessage().contains("duplicate key error")) { + log.warn(e); // Suppress the stack trace so log doesn't get flooded. + } else { + throw new MongoDbBatchWriterException("Error flushing statements", e); + } } catch (final Exception e) { throw new MongoDbBatchWriterException("Error flushing statements", e); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java index 8fb796a..11f2dc1 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java @@ -25,6 +25,7 @@ import java.util.List; import org.bson.Document; import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.InsertManyOptions; /** * Provides access to the {@link MongoCollection} type. @@ -47,6 +48,6 @@ public class MongoCollectionType implements CollectionType<Document> { @Override public void insertMany(final List<Document> items) { - collection.insertMany(items); + collection.insertMany(items, new InsertManyOptions().ordered(false)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java new file mode 100644 index 0000000..68bbc27 --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaBatchWriterIT.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.log4j.BasicConfigurator; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.mongodb.batch.MongoDbBatchWriter; +import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig; +import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils; +import org.apache.rya.mongodb.batch.collection.DbCollectionType; +import org.apache.rya.mongodb.batch.collection.MongoCollectionType; +import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; +import org.bson.Document; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; +import com.mongodb.DBObject; +import com.mongodb.MongoClient; + +/** + * Integration tests for the {@link MongoDbBatchWriter}. + */ +public class MongoDBRyaBatchWriterIT extends MongoTestBase { + private MongoDBRyaDAO dao; + + private static void setupLogging() { + BasicConfigurator.configure(); + } + + @BeforeClass + public static void setUpClass() throws Exception { + setupLogging(); + } + + @Before + public void setUp() throws Exception { + conf.setBoolean("rya.mongodb.dao.flusheachupdate", false); + conf.setInt("rya.mongodb.dao.batchwriter.size", 50_000); + conf.setLong("rya.mongodb.dao.batchwriter.flushtime", 100L); + + final MongoClient client = super.getMongoClient(); + dao = new MongoDBRyaDAO(conf, client); + } + + @Test + public void testDuplicateKeys() throws Exception { + final List<RyaStatement> statements = new ArrayList<>(); + statements.add(statement(1)); + statements.add(statement(2)); + statements.add(statement(1)); + statements.add(statement(3)); + statements.add(statement(1)); + statements.add(statement(4)); + statements.add(statement(1)); + statements.add(statement(5)); + statements.add(statement(1)); + statements.add(statement(6)); + + dao.add(statements.iterator()); + + dao.flush(); + + Assert.assertEquals(6, getRyaCollection().count()); + } + + @Test + public void testDbCollectionFlush() throws Exception { + final MongoDBStorageStrategy<RyaStatement> storageStrategy = new SimpleMongoDBStorageStrategy(); + + final List<DBObject> objects = Lists.newArrayList( + storageStrategy.serialize(statement(1)), + storageStrategy.serialize(statement(2)), + storageStrategy.serialize(statement(2)), + null, + storageStrategy.serialize(statement(3)), + storageStrategy.serialize(statement(3)), + storageStrategy.serialize(statement(4)) + ); + + final DbCollectionType collectionType = new DbCollectionType(getRyaDbCollection()); + final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf); + final MongoDbBatchWriter<DBObject> mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(collectionType, mongoDbBatchWriterConfig); + + mongoDbBatchWriter.start(); + mongoDbBatchWriter.addObjectsToQueue(objects); + mongoDbBatchWriter.flush(); + Thread.sleep(1_000); + mongoDbBatchWriter.addObjectsToQueue(objects); + mongoDbBatchWriter.flush(); + Thread.sleep(1_000); + mongoDbBatchWriter.shutdown(); + Assert.assertEquals(4, getRyaDbCollection().count()); + } + + @Test + public void testMongoCollectionFlush() throws Exception { + final MongoDBStorageStrategy<RyaStatement> storageStrategy = new SimpleMongoDBStorageStrategy(); + + final List<Document> documents = Lists.newArrayList( + toDocument(storageStrategy.serialize(statement(1))), + toDocument(storageStrategy.serialize(statement(2))), + toDocument(storageStrategy.serialize(statement(2))), + null, + toDocument(storageStrategy.serialize(statement(3))), + toDocument(storageStrategy.serialize(statement(3))), + toDocument(storageStrategy.serialize(statement(4))) + ); + + final MongoCollectionType mongoCollectionType = new MongoCollectionType(getRyaCollection()); + final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf); + final MongoDbBatchWriter<Document> mongoDbBatchWriter = new MongoDbBatchWriter<Document>(mongoCollectionType, mongoDbBatchWriterConfig); + + mongoDbBatchWriter.start(); + mongoDbBatchWriter.addObjectsToQueue(documents); + mongoDbBatchWriter.flush(); + Thread.sleep(1_000); + mongoDbBatchWriter.addObjectsToQueue(documents); + mongoDbBatchWriter.flush(); + Thread.sleep(1_000); + mongoDbBatchWriter.shutdown(); + Assert.assertEquals(4, getRyaCollection().count()); + } + + private static Document toDocument(final DBObject dbObject) { + if (dbObject == null) { + return null; + } + final Document document = Document.parse(dbObject.toString()); + return document; + } + + private static RyaURI ryaURI(final int v) { + return new RyaURI("u:" + v); + } + + private static RyaStatement statement(final int v) { + final RyaStatementBuilder builder = new RyaStatementBuilder(); + builder.setPredicate(ryaURI(v)); + builder.setSubject(ryaURI(v)); + builder.setObject(ryaURI(v)); + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java index 5f3605e..a014e8f 100644 --- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoDBRyaDAOIT.java @@ -36,6 +36,7 @@ import org.apache.rya.mongodb.document.util.AuthorizationsUtil; import org.apache.rya.mongodb.document.visibility.DocumentVisibility; import org.bson.Document; import org.calrissian.mango.collect.CloseableIterable; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; @@ -46,7 +47,7 @@ import com.mongodb.client.MongoDatabase; public class MongoDBRyaDAOIT extends MongoTestBase { private MongoClient client; - private MongoDBRyaDAO dao; + private static MongoDBRyaDAO dao; @Before public void setUp() throws IOException, RyaDAOException{ @@ -55,6 +56,11 @@ public class MongoDBRyaDAOIT extends MongoTestBase { dao = new MongoDBRyaDAO(conf, client); } + @AfterClass + public static void tearDown() throws RyaDAOException { + dao.destroy(); + } + @Test public void testDeleteWildcard() throws RyaDAOException { final RyaStatementBuilder builder = new RyaStatementBuilder(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java index ffd4fd9..e325e82 100644 --- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoTestBase.java @@ -19,10 +19,13 @@ package org.apache.rya.mongodb; import org.apache.hadoop.conf.Configuration; +import org.bson.Document; import org.junit.After; import org.junit.Before; +import com.mongodb.DBCollection; import com.mongodb.MongoClient; +import com.mongodb.client.MongoCollection; /** * A base class that may be used when implementing Mongo DB tests that use the @@ -57,4 +60,18 @@ public class MongoTestBase { public MongoClient getMongoClient() { return mongoClient; } + + /** + * @return The Rya triples {@link MongoCollection}. + */ + public MongoCollection<Document> getRyaCollection() { + return mongoClient.getDatabase(conf.getMongoDBName()).getCollection(conf.getTriplesCollectionName()); + } + + /** + * @return The Rya triples {@link DBCollection}. + */ + public DBCollection getRyaDbCollection() { + return mongoClient.getDB(conf.getMongoDBName()).getCollection(conf.getTriplesCollectionName()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b372ebcd/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java index f8ab40f..9ce6e22 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java @@ -121,6 +121,12 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat @Override public void close() throws IOException { + flush(); + try { + mongoDbBatchWriter.shutdown(); + } catch (final MongoDbBatchWriterException e) { + throw new IOException("Error shutting down MongoDB batch writer", e); + } } @Override
