RYA-307 Improved Rya MongoDB ingest of statements ... Closes #181 ...through the Sail Layer and Rya DAO by queueing up multiple inserts at a time so can be written as a single batch. If no statements in the batch have been written after a set time limit then they are flushed out into the datastore. The size of the batch and the time limit are configurable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8def4cac Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8def4cac Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8def4cac Branch: refs/heads/master Commit: 8def4cacac7d0bb9ca3cc675c53d849261aa8029 Parents: fa2aad5 Author: eric.white <[email protected]> Authored: Wed Jul 19 09:08:32 2017 -0400 Committer: Aaron Mihalik <[email protected]> Committed: Thu Aug 3 15:45:05 2017 -0400 ---------------------------------------------------------------------- .../java/org/apache/rya/api/persist/RyaDAO.java | 7 + .../org/apache/rya/accumulo/AccumuloRyaDAO.java | 161 ++++++------- .../rya/mongodb/MongoDBRdfConfiguration.java | 24 ++ .../org/apache/rya/mongodb/MongoDBRyaDAO.java | 63 ++++- .../rya/mongodb/batch/MongoDbBatchWriter.java | 238 +++++++++++++++++++ .../mongodb/batch/MongoDbBatchWriterConfig.java | 88 +++++++ .../batch/MongoDbBatchWriterException.java | 59 +++++ .../mongodb/batch/MongoDbBatchWriterUtils.java | 82 +++++++ .../batch/collection/CollectionType.java | 43 ++++ .../batch/collection/DbCollectionType.java | 53 +++++ .../batch/collection/MongoCollectionType.java | 52 ++++ .../indexing/mongodb/AbstractMongoIndexer.java | 60 ++++- .../rya/indexing/mongo/MongoEntityIndexIT.java | 22 +- .../indexing/mongo/MongoIndexerDeleteIT.java | 16 +- .../RdfCloudTripleStoreConnection.java | 195 +++++++-------- 15 files changed, 965 insertions(+), 198 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java b/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java index 57aae1b..d83a5e9 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/persist/RyaDAO.java @@ -123,4 +123,11 @@ public interface RyaDAO<C extends RdfCloudTripleStoreConfiguration> extends RyaC public void purge(RdfCloudTripleStoreConfiguration configuration); public void dropAndDestroy() throws RyaDAOException; + + /** + * Flushes any RyaStatements queued for insertion and writes them to the + * datastore. + * @throws RyaDAOException + */ + public void flush() throws RyaDAOException; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java index bd7d2b3..f1f7c03 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRyaDAO.java @@ -59,12 +59,6 @@ import org.apache.accumulo.core.security.Authorizations; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.Text; -import org.openrdf.model.Namespace; - -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; - -import info.aduna.iteration.CloseableIteration; import org.apache.rya.accumulo.experimental.AccumuloIndexer; import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; @@ -76,6 +70,12 @@ import org.apache.rya.api.persist.RyaDAO; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.persist.RyaNamespaceManager; import org.apache.rya.api.resolver.RyaTripleContext; +import org.openrdf.model.Namespace; + +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; + +import info.aduna.iteration.CloseableIteration; public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaNamespaceManager<AccumuloRdfConfiguration> { private static final Log logger = LogFactory.getLog(AccumuloRyaDAO.class); @@ -131,13 +131,13 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName flushEachUpdate = conf.flushEachUpdate(); - TableOperations tableOperations = connector.tableOperations(); + final TableOperations tableOperations = connector.tableOperations(); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getSpo()); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getPo()); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getOsp()); AccumuloRdfUtils.createTableIfNotExist(tableOperations, tableLayoutStrategy.getNs()); - for (AccumuloIndexer index : secondaryIndexers) { + for (final AccumuloIndexer index : secondaryIndexers) { index.setConf(conf); } @@ -150,7 +150,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName bw_ns = mt_bw.getBatchWriter(tableLayoutStrategy.getNs()); - for (AccumuloIndexer index : secondaryIndexers) { + for (final AccumuloIndexer index : secondaryIndexers) { index.setConnector(connector); index.setMultiTableBatchWriter(mt_bw); index.init(); @@ -161,7 +161,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName checkVersion(); initialized = true; - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } } @@ -169,7 +169,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName @Override public String getVersion() throws RyaDAOException { String version = null; - CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); + final CloseableIteration<RyaStatement, RyaDAOException> versIter = queryEngine.query(new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, null), conf); if (versIter.hasNext()) { version = versIter.next().getObject().getData(); } @@ -179,43 +179,43 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName } @Override - public void add(RyaStatement statement) throws RyaDAOException { + public void add(final RyaStatement statement) throws RyaDAOException { commit(Iterators.singletonIterator(statement)); } @Override - public void add(Iterator<RyaStatement> iter) throws RyaDAOException { + public void add(final Iterator<RyaStatement> iter) throws RyaDAOException { commit(iter); } @Override - public void delete(RyaStatement stmt, AccumuloRdfConfiguration aconf) throws RyaDAOException { + public void delete(final RyaStatement stmt, final AccumuloRdfConfiguration aconf) throws RyaDAOException { this.delete(Iterators.singletonIterator(stmt), aconf); } @Override - public void delete(Iterator<RyaStatement> statements, AccumuloRdfConfiguration conf) throws RyaDAOException { + public void delete(final Iterator<RyaStatement> statements, final AccumuloRdfConfiguration conf) throws RyaDAOException { try { while (statements.hasNext()) { - RyaStatement stmt = statements.next(); + final RyaStatement stmt = statements.next(); //query first - CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf); + final CloseableIteration<RyaStatement, RyaDAOException> query = this.queryEngine.query(stmt, conf); while (query.hasNext()) { deleteSingleRyaStatement(query.next()); } - for (AccumuloIndexer index : secondaryIndexers) { + for (final AccumuloIndexer index : secondaryIndexers) { index.deleteStatement(stmt); } } if (flushEachUpdate) { mt_bw.flush(); } - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } } @Override - public void dropGraph(AccumuloRdfConfiguration conf, RyaURI... graphs) throws RyaDAOException { + public void dropGraph(final AccumuloRdfConfiguration conf, final RyaURI... graphs) throws RyaDAOException { BatchDeleter bd_spo = null; BatchDeleter bd_po = null; BatchDeleter bd_osp = null; @@ -229,7 +229,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName bd_po.setRanges(Collections.singleton(new Range())); bd_osp.setRanges(Collections.singleton(new Range())); - for (RyaURI graph : graphs){ + for (final RyaURI graph : graphs){ bd_spo.fetchColumnFamily(new Text(graph.getData())); bd_po.fetchColumnFamily(new Text(graph.getData())); bd_osp.fetchColumnFamily(new Text(graph.getData())); @@ -244,7 +244,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName // index.dropGraph(graphs); // } - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } finally { if (bd_spo != null) { @@ -260,34 +260,34 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName } - protected void deleteSingleRyaStatement(RyaStatement stmt) throws IOException, MutationsRejectedException { - Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt); + protected void deleteSingleRyaStatement(final RyaStatement stmt) throws IOException, MutationsRejectedException { + final Map<TABLE_LAYOUT, Collection<Mutation>> map = ryaTableMutationsFactory.serializeDelete(stmt); bw_spo.addMutations(map.get(TABLE_LAYOUT.SPO)); bw_po.addMutations(map.get(TABLE_LAYOUT.PO)); bw_osp.addMutations(map.get(TABLE_LAYOUT.OSP)); } - protected void commit(Iterator<RyaStatement> commitStatements) throws RyaDAOException { + protected void commit(final Iterator<RyaStatement> commitStatements) throws RyaDAOException { try { //TODO: Should have a lock here in case we are adding and committing at the same time while (commitStatements.hasNext()) { - RyaStatement stmt = commitStatements.next(); + final RyaStatement stmt = commitStatements.next(); - Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt); - Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); + final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(stmt); + final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); + final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); + final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); bw_spo.addMutations(spo); bw_po.addMutations(po); bw_osp.addMutations(osp); - for (AccumuloIndexer index : secondaryIndexers) { + for (final AccumuloIndexer index : secondaryIndexers) { index.storeStatement(stmt); } } if (flushEachUpdate) { mt_bw.flush(); } - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } } @@ -303,57 +303,57 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName mt_bw.flush(); mt_bw.close(); - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } - for(AccumuloIndexer indexer : this.secondaryIndexers) { + for(final AccumuloIndexer indexer : this.secondaryIndexers) { try { indexer.destroy(); - } catch(Exception e) { + } catch(final Exception e) { logger.warn("Failed to destroy indexer", e); } } } @Override - public void addNamespace(String pfx, String namespace) throws RyaDAOException { + public void addNamespace(final String pfx, final String namespace) throws RyaDAOException { try { - Mutation m = new Mutation(new Text(pfx)); + final Mutation m = new Mutation(new Text(pfx)); m.put(INFO_NAMESPACE_TXT, EMPTY_TEXT, new Value(namespace.getBytes())); bw_ns.addMutation(m); if (flushEachUpdate) { mt_bw.flush(); } - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } } @Override - public String getNamespace(String pfx) throws RyaDAOException { + public String getNamespace(final String pfx) throws RyaDAOException { try { - Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), + final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), ALL_AUTHORIZATIONS); scanner.fetchColumn(INFO_NAMESPACE_TXT, EMPTY_TEXT); scanner.setRange(new Range(new Text(pfx))); - Iterator<Map.Entry<Key, Value>> iterator = scanner + final Iterator<Map.Entry<Key, Value>> iterator = scanner .iterator(); if (iterator.hasNext()) { return new String(iterator.next().getValue().get()); } - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } return null; } @Override - public void removeNamespace(String pfx) throws RyaDAOException { + public void removeNamespace(final String pfx) throws RyaDAOException { try { - Mutation del = new Mutation(new Text(pfx)); + final Mutation del = new Mutation(new Text(pfx)); del.putDelete(INFO_NAMESPACE_TXT, EMPTY_TEXT); bw_ns.addMutation(del); if (flushEachUpdate) { mt_bw.flush(); } - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } } @@ -362,12 +362,12 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName @Override public CloseableIteration<Namespace, RyaDAOException> iterateNamespace() throws RyaDAOException { try { - Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), + final Scanner scanner = connector.createScanner(tableLayoutStrategy.getNs(), ALL_AUTHORIZATIONS); scanner.fetchColumnFamily(INFO_NAMESPACE_TXT); - Iterator<Map.Entry<Key, Value>> result = scanner.iterator(); + final Iterator<Map.Entry<Key, Value>> result = scanner.iterator(); return new AccumuloNamespaceTableIterator(result); - } catch (Exception e) { + } catch (final Exception e) { throw new RyaDAOException(e); } } @@ -378,21 +378,21 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName } @Override - public void purge(RdfCloudTripleStoreConfiguration configuration) { - for (String tableName : getTables()) { + public void purge(final RdfCloudTripleStoreConfiguration configuration) { + for (final String tableName : getTables()) { try { purge(tableName, configuration.getAuths()); compact(tableName); - } catch (TableNotFoundException e) { + } catch (final TableNotFoundException e) { logger.error(e.getMessage()); - } catch (MutationsRejectedException e) { + } catch (final MutationsRejectedException e) { logger.error(e.getMessage()); } } - for(AccumuloIndexer indexer : this.secondaryIndexers) { + for(final AccumuloIndexer indexer : this.secondaryIndexers) { try { indexer.purge(configuration); - } catch(Exception e) { + } catch(final Exception e) { logger.error("Failed to purge indexer", e); } } @@ -400,24 +400,24 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName @Override public void dropAndDestroy() throws RyaDAOException { - for (String tableName : getTables()) { + for (final String tableName : getTables()) { try { drop(tableName); - } catch (AccumuloSecurityException e) { + } catch (final AccumuloSecurityException e) { logger.error(e.getMessage()); throw new RyaDAOException(e); - } catch (AccumuloException e) { + } catch (final AccumuloException e) { logger.error(e.getMessage()); throw new RyaDAOException(e); - } catch (TableNotFoundException e) { + } catch (final TableNotFoundException e) { logger.warn(e.getMessage()); } } destroy(); - for(AccumuloIndexer indexer : this.secondaryIndexers) { + for(final AccumuloIndexer indexer : this.secondaryIndexers) { try { indexer.dropAndDestroy(); - } catch(Exception e) { + } catch(final Exception e) { logger.error("Failed to drop and destroy indexer", e); } } @@ -427,7 +427,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName return connector; } - public void setConnector(Connector connector) { + public void setConnector(final Connector connector) { this.connector = connector; } @@ -435,7 +435,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName return batchWriterConfig; } - public void setBatchWriterConfig(BatchWriterConfig batchWriterConfig) { + public void setBatchWriterConfig(final BatchWriterConfig batchWriterConfig) { this.batchWriterConfig = batchWriterConfig; } @@ -449,7 +449,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName } @Override - public void setConf(AccumuloRdfConfiguration conf) { + public void setConf(final AccumuloRdfConfiguration conf) { this.conf = conf; } @@ -457,7 +457,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName return ryaTableMutationsFactory; } - public void setRyaTableMutationsFactory(RyaTableMutationsFactory ryaTableMutationsFactory) { + public void setRyaTableMutationsFactory(final RyaTableMutationsFactory ryaTableMutationsFactory) { this.ryaTableMutationsFactory = ryaTableMutationsFactory; } @@ -466,21 +466,22 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName return queryEngine; } - public void setQueryEngine(AccumuloRyaQueryEngine queryEngine) { + public void setQueryEngine(final AccumuloRyaQueryEngine queryEngine) { this.queryEngine = queryEngine; } + @Override public void flush() throws RyaDAOException { try { mt_bw.flush(); - } catch (MutationsRejectedException e) { + } catch (final MutationsRejectedException e) { throw new RyaDAOException(e); } } protected String[] getTables() { // core tables - List<String> tableNames = Lists.newArrayList( + final List<String> tableNames = Lists.newArrayList( tableLayoutStrategy.getSpo(), tableLayoutStrategy.getPo(), tableLayoutStrategy.getOsp(), @@ -488,17 +489,17 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName tableLayoutStrategy.getEval()); // Additional Tables - for (AccumuloIndexer index : secondaryIndexers) { + for (final AccumuloIndexer index : secondaryIndexers) { tableNames.add(index.getTableName()); } return tableNames.toArray(new String[]{}); } - private void purge(String tableName, String[] auths) throws TableNotFoundException, MutationsRejectedException { + private void purge(final String tableName, final String[] auths) throws TableNotFoundException, MutationsRejectedException { if (tableExists(tableName)) { logger.info("Purging accumulo table: " + tableName); - BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths)); + final BatchDeleter batchDeleter = createBatchDeleter(tableName, new Authorizations(auths)); try { batchDeleter.setRanges(Collections.singleton(new Range())); batchDeleter.delete(); @@ -508,31 +509,31 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName } } - private void compact(String tableName) { + private void compact(final String tableName) { logger.info("Requesting major compaction for table " + tableName); try { connector.tableOperations().compact(tableName, null, null, true, false); - } catch (Exception e) { + } catch (final Exception e) { logger.error(e.getMessage()); } } - private boolean tableExists(String tableName) { + private boolean tableExists(final String tableName) { return getConnector().tableOperations().exists(tableName); } - private BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations) throws TableNotFoundException { + private BatchDeleter createBatchDeleter(final String tableName, final Authorizations authorizations) throws TableNotFoundException { return connector.createBatchDeleter(tableName, authorizations, NUM_THREADS, MAX_MEMORY, MAX_TIME, NUM_THREADS); } private void checkVersion() throws RyaDAOException, IOException, MutationsRejectedException { - String version = getVersion(); + final String version = getVersion(); if (version == null) { //adding to core Rya tables but not Indexes - Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement()); - Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); - Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); - Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); + final Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationsFactory.serialize(getVersionRyaStatement()); + final Collection<Mutation> spo = mutationMap.get(TABLE_LAYOUT.SPO); + final Collection<Mutation> po = mutationMap.get(TABLE_LAYOUT.PO); + final Collection<Mutation> osp = mutationMap.get(TABLE_LAYOUT.OSP); bw_spo.addMutations(spo); bw_po.addMutations(po); bw_osp.addMutations(osp); @@ -544,7 +545,7 @@ public class AccumuloRyaDAO implements RyaDAO<AccumuloRdfConfiguration>, RyaName return new RyaStatement(RTS_SUBJECT_RYA, RTS_VERSION_PREDICATE_RYA, VERSION_RYA); } - private void drop(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + private void drop(final String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException { logger.info("Dropping cloudbase table: " + tableName); connector.tableOperations().delete(tableName); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java index 067b682..418a155 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java @@ -40,6 +40,8 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration { public static final String MONGO_USER_PASSWORD = "mongo.db.userpassword"; public static final String CONF_ADDITIONAL_INDEXERS = "ac.additional.indexers"; public static final String USE_MOCK_MONGO = ".useMockInstance"; + public static final String CONF_FLUSH_EACH_UPDATE = "rya.mongodb.dao.flusheachupdate"; + private MongoClient mongoClient; public MongoDBRdfConfiguration() { @@ -99,6 +101,28 @@ public class MongoDBRdfConfiguration extends RdfCloudTripleStoreConfiguration { } /** + * @return {@code true} if each statement added to the batch writer should + * be flushed and written right away to the datastore. {@code false} if the + * statements should be queued and written to the datastore when the queue + * is full or after enough time has passed without a write.<p> + * Defaults to {@code true} if nothing is specified. + */ + public boolean flushEachUpdate(){ + return getBoolean(CONF_FLUSH_EACH_UPDATE, true); + } + + /** + * Sets the {@link #CONF_FLUSH_EACH_UPDATE} property of the configuration. + * @param flush {@code true} if each statement added to the batch writer + * should be flushed and written right away to the datastore. {@code false} + * if the statements should be queued and written to the datastore when the + * queue is full or after enough time has passed without a write. + */ + public void setFlush(final boolean flush){ + setBoolean(CONF_FLUSH_EACH_UPDATE, flush); + } + + /** * @return name of Mongo Collection containing Rya triples */ public String getTriplesCollectionName() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java index daa8a67..a32651d 100644 --- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java @@ -37,6 +37,11 @@ import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.persist.RyaNamespaceManager; import org.apache.rya.api.persist.index.RyaSecondaryIndexer; import org.apache.rya.api.persist.query.RyaQueryEngine; +import org.apache.rya.mongodb.batch.MongoDbBatchWriter; +import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig; +import org.apache.rya.mongodb.batch.MongoDbBatchWriterException; +import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils; +import org.apache.rya.mongodb.batch.collection.DbCollectionType; import org.apache.rya.mongodb.dao.MongoDBNamespaceManager; import org.apache.rya.mongodb.dao.MongoDBStorageStrategy; import org.apache.rya.mongodb.dao.SimpleMongoDBNamespaceManager; @@ -47,7 +52,6 @@ import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.DuplicateKeyException; -import com.mongodb.InsertOptions; import com.mongodb.MongoClient; /** @@ -56,6 +60,8 @@ import com.mongodb.MongoClient; public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ private static final Logger log = Logger.getLogger(MongoDBRyaDAO.class); + private boolean isInitialized = false; + private boolean flushEachUpdate = true; private MongoDBRdfConfiguration conf; private final MongoClient mongoClient; private DB db; @@ -67,6 +73,8 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ private List<MongoSecondaryIndex> secondaryIndexers; private Authorizations auths; + private MongoDbBatchWriter<DBObject> mongoDbBatchWriter; + /** * Creates a new instance of {@link MongoDBRyaDAO}. * @param conf the {@link MongoDBRdfConfiguration}. @@ -87,6 +95,7 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ this.mongoClient = mongoClient; conf.setMongoClient(mongoClient); auths = conf.getAuthorizations(); + flushEachUpdate = conf.flushEachUpdate(); init(); } @@ -116,6 +125,9 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ @Override public void init() throws RyaDAOException { + if (isInitialized) { + return; + } secondaryIndexers = conf.getAdditionalIndexers(); for(final MongoSecondaryIndex index: secondaryIndexers) { index.setConf(conf); @@ -131,15 +143,34 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ for(final MongoSecondaryIndex index: secondaryIndexers) { index.init(); } + + final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf); + mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(new DbCollectionType(coll), mongoDbBatchWriterConfig); + try { + mongoDbBatchWriter.start(); + } catch (final MongoDbBatchWriterException e) { + throw new RyaDAOException("Error starting MongoDB batch writer", e); + } + isInitialized = true; } @Override public boolean isInitialized() throws RyaDAOException { - return true; + return isInitialized; } @Override public void destroy() throws RyaDAOException { + if (!isInitialized) { + return; + } + isInitialized = false; + flush(); + try { + mongoDbBatchWriter.shutdown(); + } catch (final MongoDbBatchWriterException e) { + throw new RyaDAOException("Error shutting down MongoDB batch writer", e); + } if (mongoClient != null) { mongoClient.close(); } @@ -153,7 +184,15 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ try { final boolean canAdd = DocumentVisibilityUtil.doesUserHaveDocumentAccess(auths, statement.getColumnVisibility()); if (canAdd) { - coll.insert(storageStrategy.serialize(statement)); + final DBObject obj = storageStrategy.serialize(statement); + try { + mongoDbBatchWriter.addObjectToQueue(obj); + if (flushEachUpdate) { + flush(); + } + } catch (final MongoDbBatchWriterException e) { + throw new RyaDAOException("Error adding statement", e); + } for(final RyaSecondaryIndexer index: secondaryIndexers) { index.storeStatement(statement); } @@ -190,7 +229,14 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ throw new RyaDAOException("User does not have the required authorizations to add statement"); } } - coll.insert(dbInserts, new InsertOptions().continueOnError(true)); + try { + mongoDbBatchWriter.addObjectsToQueue(dbInserts); + if (flushEachUpdate) { + flush(); + } + } catch (final MongoDbBatchWriterException e) { + throw new RyaDAOException("Error adding statements", e); + } } @Override @@ -263,4 +309,13 @@ public final class MongoDBRyaDAO implements RyaDAO<MongoDBRdfConfiguration>{ public void dropAndDestroy() throws RyaDAOException { db.dropDatabase(); // this is dangerous! } + + @Override + public void flush() throws RyaDAOException { + try { + mongoDbBatchWriter.flush(); + } catch (final MongoDbBatchWriterException e) { + throw new RyaDAOException("Error flushing data.", e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java new file mode 100644 index 0000000..2f52b5c --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriter.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.batch; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.rya.mongodb.batch.collection.CollectionType; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Handles batch writing MongoDB statement objects to the repository. It takes + * in a configurable batch size and flush time. If the number of objects placed + * in the queue reaches the batch size then the objects are bulk written to the + * datastore. Or if the queue has not filled up after the batch time duration + * has passed then the statements are flushed out and written to the datastore. + * @param <T> the type of object that the batch writer's internal collection + * type uses. + */ +public class MongoDbBatchWriter<T> { + private static final Logger log = Logger.getLogger(MongoDbBatchWriter.class); + + private static final int CHECK_QUEUE_INTERVAL_MS = 10; + + private final CollectionType<T> collectionType; + private final long batchFlushTimeMs; + + private final ArrayBlockingQueue<T> statementInsertionQueue; + private final ScheduledThreadPoolExecutor scheduledExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(0); + private ScheduledFuture<?> flushBatchFuture; + private final Runnable flushBatchTask; + private Thread queueFullCheckerThread; + + private final AtomicBoolean isInit = new AtomicBoolean(); + + /** + * Creates a new instance of {@link MongoDbBatchWriter}. + * @param collectionType the {@link CollectionType}. (not {@code null}) + * @param mongoDbBatchWriterConfig the {@link MongoDbBatchWriterConfig}. + * (not {@code null}) + */ + public MongoDbBatchWriter(final CollectionType<T> collectionType, final MongoDbBatchWriterConfig mongoDbBatchWriterConfig) { + this.collectionType = checkNotNull(collectionType); + this.batchFlushTimeMs = checkNotNull(mongoDbBatchWriterConfig).getBatchFlushTimeMs(); + + statementInsertionQueue = new ArrayBlockingQueue<>(mongoDbBatchWriterConfig.getBatchSize()); + flushBatchTask = new BatchFlusher(); + } + + /** + * Task used to flush statements if enough time has passed without an + * insertion while there are objects enqueued. + */ + private class BatchFlusher implements Runnable { + @Override + public void run() { + try { + if (!statementInsertionQueue.isEmpty()) { + log.trace("Running statement insertion flush task. Too much time has passed without any object insertions so all queued data is being flushed."); + flush(); + } + } catch (final Exception e) { + log.error("Error flush out the statements", e); + } + } + } + + private static final ThreadFactory QUEUE_THREAD_FACTORY = new ThreadFactoryBuilder() + .setNameFormat("Queue Full Checker Thread - %d") + .setDaemon(true) + .build(); + + /** + * Checks the queue for statements to insert if the queue is full. + */ + private class QueueFullChecker implements Runnable { + @Override + public void run() { + try { + while (isInit.get()) { + // Check if the queue is full and if it is then insert the + // statements. Otherwise reset the insertion timer. + if (statementInsertionQueue.remainingCapacity() == 0) { + log.trace("Statement queue is FULL -> going to empty it"); + try { + flush(); + } catch (final MongoDbBatchWriterException e) { + log.error("Error emptying queue", e); + } + } + Thread.sleep(CHECK_QUEUE_INTERVAL_MS); + } + } catch (final InterruptedException e) { + log.error("Encountered an unexpected error while checking the batch queue.", e); + } + } + } + + /** + * Starts the batch writer queue and processes. + */ + public void start() throws MongoDbBatchWriterException { + if (!isInit.get()) { + if (flushBatchFuture == null) { + flushBatchFuture = startFlushTimer(); + } + if (queueFullCheckerThread == null) { + queueFullCheckerThread = QUEUE_THREAD_FACTORY.newThread(new QueueFullChecker()); + } + isInit.set(true); + queueFullCheckerThread.start(); + } + } + + /** + * Stops the batch writer processes. + */ + public void shutdown() throws MongoDbBatchWriterException { + isInit.set(false); + if (flushBatchFuture != null) { + flushBatchFuture.cancel(true); + flushBatchFuture = null; + } + if (queueFullCheckerThread != null) { + if (queueFullCheckerThread.isAlive()) { + try { + queueFullCheckerThread.join(2 * CHECK_QUEUE_INTERVAL_MS); + } catch (final InterruptedException e) { + log.error("Error waiting for thread to finish", e); + } + queueFullCheckerThread = null; + } + } + } + + /** + * Adds a MongoDB object to the queue which will not be written until one of + * the following occur:<br> + * <ul> + * <li>The queue fills up</li> + * <li>The flush time has been reached</li> + * <li>A direct call to the {@link MongoDbBatchWriter#flush()} method + * has been made</li> + * </ul> + * @param object the object to add to the queue. + * @throws IOException + */ + public void addObjectToQueue(final T object) throws MongoDbBatchWriterException { + if (object != null) { + try { + // Place in the queue which will bulk write after the specified + // "batchSize" number of items have filled the queue or if more + // than "batchFlushTimeMs" milliseconds have passed since the + // last insertion. + resetFlushTimer(); + statementInsertionQueue.put(object); + } catch (final Exception e) { + throw new MongoDbBatchWriterException("Error adding object to batch queue.", e); + } + } + } + + /** + * Adds a list of MongoDB objects to the queue which will not be written + * until one of the following occur:<br> + * <ul> + * <li>The queue fills up</li> + * <li>The flush time has been reached</li> + * <li>A direct call to the {@link MongoDbBatchWriter#flush()} method + * has been made</li> + * </ul> + * @param objects a {@link List} of objects to add to the queue. + * @throws IOException + */ + public void addObjectsToQueue(final List<T> objects) throws MongoDbBatchWriterException { + if (objects != null) { + for (final T object : objects) { + addObjectToQueue(object); + } + } + } + + /** + * Flushes out statements that are in the queue. + */ + public void flush() throws MongoDbBatchWriterException { + final List<T> batch = new ArrayList<>(); + try { + statementInsertionQueue.drainTo(batch); + if (!batch.isEmpty()) { + collectionType.insertMany(batch); + } + } catch (final Exception e) { + throw new MongoDbBatchWriterException("Error flushing statements", e); + } + } + + private void resetFlushTimer() throws MongoDbBatchWriterException { + flushBatchFuture.cancel(false); + flushBatchFuture = startFlushTimer(); + } + + private ScheduledFuture<?> startFlushTimer() throws MongoDbBatchWriterException { + try { + return scheduledExecutor.schedule(flushBatchTask, batchFlushTimeMs, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + throw new MongoDbBatchWriterException("Error starting batch flusher", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java new file mode 100644 index 0000000..cec8b9a --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterConfig.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.batch; + +import com.google.common.base.Preconditions; + +/** + * Configuration for the MongoDB Batch Writer. + */ +public class MongoDbBatchWriterConfig { + /** + * The default number of statements to batch write at a time. + */ + public static final int DEFAULT_BATCH_SIZE = 50000; + private Integer batchSize = null; + + /** + * The default time to wait in milliseconds to flush all statements out that + * are queued for insertion if the queue has not filled up to its capacity + * of {@link #DEFAULT_BATCH_SIZE} or the user configured buffer size. + */ + public static final long DEFAULT_BATCH_FLUSH_TIME_MS = 100L; + private Long batchFlushTimeMs = null; + + /** + * Creates a new instance of {@link MongoDbBatchWriterConfig}. + */ + public MongoDbBatchWriterConfig() { + } + + /** + * Gets the configured number of statements to batch write at a time. + * @return the configured value or the default value. + */ + public int getBatchSize() { + return batchSize != null ? batchSize : DEFAULT_BATCH_SIZE; + } + + /** + * Sets the number of statements to batch write at a time. + * @param batchSize the number of statements in each batch. + * @return the {@link MongoDbBatchWriterConfig}. + */ + public MongoDbBatchWriterConfig setBatchSize(final int batchSize) { + Preconditions.checkArgument(batchSize > 0, "Batch size must be positive."); + this.batchSize = batchSize; + return this; + } + + /** + * Gets the configured time to wait in milliseconds to flush all statements + * out that are queued for insertion if the queue has not filled up to its + * capacity. + * @return the configured value or the default value. + */ + public long getBatchFlushTimeMs() { + return batchFlushTimeMs != null ? batchFlushTimeMs : DEFAULT_BATCH_FLUSH_TIME_MS; + } + + /** + * Sets the time to wait in milliseconds to flush all statements out that + * are queued for insertion if the queue has not filled up to its capacity. + * @param batchFlushTimeMs the time to wait before flushing all queued + * statements that have not been written. + * @return the {@link MongoDbBatchWriterConfig}. + */ + public MongoDbBatchWriterConfig setBatchFlushTimeMs(final long batchFlushTimeMs) { + Preconditions.checkArgument(batchFlushTimeMs >= 0, "Batch flush time must be non-negative."); + this.batchFlushTimeMs = batchFlushTimeMs; + return this; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java new file mode 100644 index 0000000..d4de156 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterException.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.batch; + +/** + * An exception to be used when there is a problem running the MongoDB Batch + * Writer. + */ +public class MongoDbBatchWriterException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Creates a new instance of {@link MongoDbBatchWriterException}. + */ + public MongoDbBatchWriterException() { + super(); + } + + /** + * Creates a new instance of {@link MongoDbBatchWriterException}. + * @param message the detail message. + */ + public MongoDbBatchWriterException(final String message) { + super(message); + } + + /** + * Creates a new instance of {@link MongoDbBatchWriterException}. + * @param message the detail message. + * @param throwable the {@link Throwable} source. + */ + public MongoDbBatchWriterException(final String message, final Throwable source) { + super(message, source); + } + + /** + * Creates a new instance of {@link MongoDbBatchWriterException}. + * @param source the {@link Throwable} source. + */ + public MongoDbBatchWriterException(final Throwable source) { + super(source); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java new file mode 100644 index 0000000..99e8992 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/MongoDbBatchWriterUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.batch; + +import org.apache.hadoop.conf.Configuration; + +/** + * Constants and utility methods related to batch writing statements in a MongoDB + * Rya repository. + */ +public final class MongoDbBatchWriterUtils { + /** + * Config tag used to specify the number of statements to batch write at a + * time. + */ + public static final String BATCH_SIZE_TAG = "rya.mongodb.dao.batchwriter.size"; + + /** + * Config tag used to specify the time to wait in milliseconds to flush all + * statements out that are queued for insertion if the queue has not filled + * up to its capacity. + */ + public static final String BATCH_FLUSH_TIME_MS_TAG = "rya.mongodb.dao.batchwriter.flushtime"; + + /** + * Private constructor to prevent instantiation. + */ + private MongoDbBatchWriterUtils() { + } + + /** + * The number of statements to batch write at a time. + * @param conf the {@link Configuration} to check. + * @return the configured value or the default value. + */ + public static int getConfigBatchSize(final Configuration conf) { + return conf.getInt(BATCH_SIZE_TAG, MongoDbBatchWriterConfig.DEFAULT_BATCH_SIZE); + } + + /** + * The time to wait in milliseconds to flush all statements out that are + * queued for insertion if the queue has not filled up to its capacity. + * @param conf the {@link Configuration} to check. + * @return the configured value or the default value. + */ + public static long getConfigBatchFlushTimeMs(final Configuration conf) { + return conf.getLong(BATCH_FLUSH_TIME_MS_TAG, MongoDbBatchWriterConfig.DEFAULT_BATCH_FLUSH_TIME_MS); + } + + /** + * Reads the specified configed to create and initialize a + * {@link MongoDbBatchWriterConfig}. If no values are found then the default + * values are used. + * @param conf the {@link Configuration} to check. + * @return the {@link MongoDbBatchWriterConfig} populated with configured + * values for the specified {@code conf}. + */ + public static MongoDbBatchWriterConfig getMongoDbBatchWriterConfig(final Configuration conf) { + final int batchSize = getConfigBatchSize(conf); + final long batchFlushTimeMs = getConfigBatchFlushTimeMs(conf); + final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = new MongoDbBatchWriterConfig(); + mongoDbBatchWriterConfig.setBatchSize(batchSize); + mongoDbBatchWriterConfig.setBatchFlushTimeMs(batchFlushTimeMs); + return mongoDbBatchWriterConfig; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java new file mode 100644 index 0000000..9e6d6fb --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/CollectionType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.batch.collection; + +import java.util.List; + +/** + * Wrapper for interacting with the new and legacy MongoDB collection types + * ({@link com.mongodb.client.MongoCollection} and + * {@link com.mongodb.DBCollection} respectively) + * in order to handle inserts from both types and the object types they + * utilize. + * @param <T> the type of object the collection type inserts. + */ +public interface CollectionType<T> { + /** + * Insert one item. + * @param item the item to insert. + */ + public void insertOne(final T item); + + /** + * Insert a list of items. + * @param items the {@link List} of items. + */ + public void insertMany(final List<T> items); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java new file mode 100644 index 0000000..ea00693 --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/DbCollectionType.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.batch.collection; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; + +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.InsertOptions; +import com.mongodb.WriteConcern; + +/** + * Provides access to the {@link DBCollection} type. + */ +public class DbCollectionType implements CollectionType<DBObject> { + private final DBCollection collection; + + /** + * Creates a new instance of {@link DbCollectionType}. + * @param collection the {@link DBCollection}. (not {@code null}) + */ + public DbCollectionType(final DBCollection collection) { + this.collection = checkNotNull(collection); + } + + @Override + public void insertOne(final DBObject item) { + collection.insert(item, WriteConcern.ACKNOWLEDGED); + } + + @Override + public void insertMany(final List<DBObject> items) { + collection.insert(items, new InsertOptions().continueOnError(true)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java new file mode 100644 index 0000000..8fb796a --- /dev/null +++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/batch/collection/MongoCollectionType.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.mongodb.batch.collection; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.List; + +import org.bson.Document; + +import com.mongodb.client.MongoCollection; + +/** + * Provides access to the {@link MongoCollection} type. + */ +public class MongoCollectionType implements CollectionType<Document> { + private final MongoCollection<Document> collection; + + /** + * Creates a new instance of {@link MongoCollectionType}. + * @param collection the {@link MongoCollection}. (not {@code null}) + */ + public MongoCollectionType(final MongoCollection<Document> collection) { + this.collection = checkNotNull(collection); + } + + @Override + public void insertOne(final Document item) { + collection.insertOne(item); + } + + @Override + public void insertMany(final List<Document> items) { + collection.insertMany(items); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java index f5372d1..69ca274 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/mongodb/AbstractMongoIndexer.java @@ -33,6 +33,11 @@ import org.apache.rya.mongodb.MongoConnectorFactory; import org.apache.rya.mongodb.MongoDBRdfConfiguration; import org.apache.rya.mongodb.MongoDBRyaDAO; import org.apache.rya.mongodb.MongoSecondaryIndex; +import org.apache.rya.mongodb.batch.MongoDbBatchWriter; +import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig; +import org.apache.rya.mongodb.batch.MongoDbBatchWriterException; +import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils; +import org.apache.rya.mongodb.batch.collection.DbCollectionType; import org.openrdf.model.Literal; import org.openrdf.model.Statement; import org.openrdf.model.URI; @@ -46,7 +51,6 @@ import com.mongodb.DBObject; import com.mongodb.MongoClient; import com.mongodb.QueryBuilder; import com.mongodb.ServerAddress; -import com.mongodb.WriteConcern; import info.aduna.iteration.CloseableIteration; @@ -58,6 +62,7 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat private static final Logger LOG = Logger.getLogger(AbstractMongoIndexer.class); private boolean isInit = false; + private boolean flushEachUpdate = true; protected Configuration conf; protected MongoDBRyaDAO dao; protected MongoClient mongoClient; @@ -68,15 +73,28 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat protected T storageStrategy; + private MongoDbBatchWriter<DBObject> mongoDbBatchWriter; + protected void initCore() { dbName = conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME); db = this.mongoClient.getDB(dbName); - collection = db.getCollection(conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName()); + final String collectionName = conf.get(MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX, "rya") + getCollectionName(); + collection = db.getCollection(collectionName); + + flushEachUpdate = ((MongoDBRdfConfiguration)conf).flushEachUpdate(); + + final MongoDbBatchWriterConfig mongoDbBatchWriterConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(conf); + mongoDbBatchWriter = new MongoDbBatchWriter<DBObject>(new DbCollectionType(collection), mongoDbBatchWriterConfig); + try { + mongoDbBatchWriter.start(); + } catch (final MongoDbBatchWriterException e) { + LOG.error("Error start MongoDB batch writer", e); + } } @Override public void setClient(final MongoClient client){ - this.mongoClient = client; + this.mongoClient = client; } @VisibleForTesting @@ -96,8 +114,8 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat public void setConf(final Configuration conf) { this.conf = conf; if (!isInit){ - setClient(MongoConnectorFactory.getMongoClient(conf)); - init(); + setClient(MongoConnectorFactory.getMongoClient(conf)); + init(); } } @@ -108,6 +126,11 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat @Override public void flush() throws IOException { + try { + mongoDbBatchWriter.flush(); + } catch (final MongoDbBatchWriterException e) { + throw new IOException("Error flushing batch writer", e); + } } @Override @@ -135,24 +158,43 @@ public abstract class AbstractMongoIndexer<T extends IndexingMongoDBStorageStrat public void storeStatements(final Collection<RyaStatement> ryaStatements) throws IOException { for (final RyaStatement ryaStatement : ryaStatements){ - storeStatement(ryaStatement); + storeStatement(ryaStatement, false); + } + if (flushEachUpdate) { + flush(); } } @Override public void storeStatement(final RyaStatement ryaStatement) throws IOException { + storeStatement(ryaStatement, flushEachUpdate); + } + + private void storeStatement(final RyaStatement ryaStatement, final boolean flush) throws IOException { + final DBObject obj = prepareStatementForStorage(ryaStatement); + try { + mongoDbBatchWriter.addObjectToQueue(obj); + if (flush) { + flush(); + } + } catch (final MongoDbBatchWriterException e) { + throw new IOException("Error storing statement", e); + } + } + + private DBObject prepareStatementForStorage(final RyaStatement ryaStatement) { try { final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); final boolean isValidPredicate = predicates.isEmpty() || predicates.contains(statement.getPredicate()); if (isValidPredicate && (statement.getObject() instanceof Literal)) { final DBObject obj = storageStrategy.serialize(ryaStatement); - if (obj != null) { - collection.insert(obj, WriteConcern.ACKNOWLEDGED); - } + return obj; } } catch (final IllegalArgumentException e) { LOG.error("Unable to parse the statement: " + ryaStatement.toString(), e); } + + return null; } @Override http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java index 4ddb2a5..6fac386 100644 --- a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoEntityIndexIT.java @@ -21,9 +21,7 @@ package org.apache.rya.indexing.mongo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.util.ArrayList; import java.util.HashSet; -import java.util.List; import java.util.Optional; import java.util.Set; @@ -36,12 +34,13 @@ import org.apache.rya.indexing.entity.storage.EntityStorage; import org.apache.rya.indexing.entity.storage.TypeStorage; import org.apache.rya.indexing.entity.update.mongo.MongoEntityIndexer; import org.apache.rya.mongodb.MockMongoFactory; +import org.apache.rya.mongodb.MongoConnectorFactory; import org.apache.rya.mongodb.MongoDBRdfConfiguration; import org.apache.rya.sail.config.RyaSailFactory; import org.bson.Document; +import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.openrdf.model.Statement; import org.openrdf.model.URI; import org.openrdf.model.Value; import org.openrdf.model.ValueFactory; @@ -70,7 +69,7 @@ public class MongoEntityIndexIT { private MongoClient mongoClient; @Before - public void setUp() throws Exception{ + public void setUp() throws Exception { mongoClient = MockMongoFactory.with(Version.Main.PRODUCTION).newMongoClient(); conf = new MongoDBRdfConfiguration(); conf.set(MongoDBRdfConfiguration.MONGO_DB_NAME, "test"); @@ -91,6 +90,19 @@ public class MongoEntityIndexIT { indexer.init(); } + @After + public void tearDown() throws Exception { + if (mongoClient != null) { + MongoConnectorFactory.closeMongoClient(); + } + if (conn != null) { + conn.clear(); + } + if (indexer != null) { + indexer.close(); + } + } + @Test public void ensureInEntityStore_Test() throws Exception { setupTypes(); @@ -202,8 +214,6 @@ public class MongoEntityIndexIT { } private void addStatements() throws Exception { - final List<Statement> stmnts = new ArrayList<>(); - //alice URI subject = VF.createURI("urn:alice"); URI predicate = VF.createURI("urn:name"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8def4cac/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java index b533d42..4b66b5b 100644 --- a/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java +++ b/extras/rya.geoindexing/src/test/java/org/apache/rya/indexing/mongo/MongoIndexerDeleteIT.java @@ -31,6 +31,8 @@ import org.apache.rya.indexing.TemporalInstantRfc3339; import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration; import org.apache.rya.mongodb.MockMongoFactory; +import org.apache.rya.mongodb.MongoConnectorFactory; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.openrdf.model.Resource; @@ -81,6 +83,16 @@ public class MongoIndexerDeleteIT { conn.begin(); } + @After + public void after() throws Exception { + if (client != null) { + MongoConnectorFactory.closeMongoClient(); + } + if (conn != null) { + conn.clear(); + } + } + @Test public void deleteTest() throws Exception { populateRya(); @@ -150,14 +162,10 @@ public class MongoIndexerDeleteIT { uuid = "urn:people"; conn.add(VF.createURI(uuid), RDF.TYPE, person); conn.add(VF.createURI(uuid), RDFS.LABEL, VF.createLiteral("Alice Palace Hose", VF.createURI("http://www.w3.org/2001/XMLSchema#string"))); - - uuid = "urn:people"; - conn.add(VF.createURI(uuid), RDF.TYPE, person); conn.add(VF.createURI(uuid), RDFS.LABEL, VF.createLiteral("Bob Snob Hose", "en")); // temporal final TemporalInstant instant = new TemporalInstantRfc3339(1, 2, 3, 4, 5, 6); - final URI time = VF.createURI("Property:atTime"); conn.add(VF.createURI("foo:time"), VF.createURI("Property:atTime"), VF.createLiteral(instant.toString())); }
