RYA-266 Issue fixed where callers don-t call init on the index.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f7b2fd63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f7b2fd63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f7b2fd63 Branch: refs/heads/master Commit: f7b2fd63167359fd829cd9ee73216c5c9736d1a8 Parents: 7dd5913 Author: David Lotts <[email protected]> Authored: Fri Sep 22 14:48:05 2017 -0400 Committer: David Lotts <[email protected]> Committed: Fri Sep 22 14:48:05 2017 -0400 ---------------------------------------------------------------------- .../freetext/AccumuloFreeTextIndexer.java | 65 ++++++++++++++++---- .../temporal/AccumuloTemporalIndexer.java | 39 +++++++++--- .../freetext/AccumuloFreeTextIndexerTest.java | 8 +++ 3 files changed, 91 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java index 8c07a8c..9078015 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -213,7 +214,17 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements private boolean isInit = false; - + /** + * Called by setConf to initialize query only. + * Use this alone if usage does not require writing. + * For a writeable (store and delete) version of this, + * call setconf() and then setMultiTableBatchWriter(), then call init() + * that is what the DAO does. + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws TableNotFoundException + * @throws TableExistsException + */ private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { final String doctable = getFreeTextDocTablename(conf); @@ -260,19 +271,25 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements tableOps.setProperty(doctable, "table.bloom.enabled", Boolean.TRUE.toString()); } - mtbw = ConfigUtils.createMultitableBatchWriter(conf); - - docTableBw = mtbw.getBatchWriter(doctable); - termTableBw = mtbw.getBatchWriter(termtable); - + // Set mtbw by calling setMultiTableBatchWriter(). The DAO does this and manages flushing. + // If you create it here, tests work, but a real Accumulo may lose writes due to unmanaged flushing. + if (mtbw != null) { + docTableBw = mtbw.getBatchWriter(doctable); + termTableBw = mtbw.getBatchWriter(termtable); + } tokenizer = ConfigUtils.getFreeTextTokenizer(conf); validPredicates = ConfigUtils.getFreeTextPredicates(conf); queryTermLimit = ConfigUtils.getFreeTextTermLimit(conf); } - - //initialization occurs in setConf because index is created using reflection + /** + * setConf sets the configuration and then initializes for query only. + * Use this alone if usage does not require writing. + * For a writeable (store and delete) version of this, + * call this and then setMultiTableBatchWriter(), then call init() + * that is what the DAO does. + */ @Override public void setConf(final Configuration conf) { this.conf = conf; @@ -294,6 +311,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements private void storeStatement(final Statement statement) throws IOException { + Objects.requireNonNull(mtbw, "Freetext indexer attempting to store, but setMultiTableBatchWriter() was not set."); + // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); @@ -393,7 +412,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements @Override public void close() throws IOException { try { - mtbw.close(); + if (mtbw!=null) + mtbw.close(); } catch (final MutationsRejectedException e) { logger.error("error closing the batch writer", e); throw new IOException(e); @@ -636,7 +656,13 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements return makeFreeTextDocTablename( ConfigUtils.getTablePrefix(conf) ); } - /** + @Override + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { + mtbw = writer; + } + + + /** * Get the Term index's table name. * * @param conf - The Rya configuration that specifies which instance of Rya @@ -684,6 +710,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements } private void deleteStatement(final Statement statement) throws IOException { + Objects.requireNonNull(mtbw, "Freetext indexer attempting to delete, but setMultiTableBatchWriter() was not set."); + // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); @@ -795,10 +823,21 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements } - @Override + /** + * called by the DAO after setting the mtbw. + * The rest of the initilization is done by setConf() + */ + @Override public void init() { - // TODO Auto-generated method stub - + Objects.requireNonNull(mtbw, "Freetext indexer failed to initialize temporal index, setMultiTableBatchWriter() was not set."); + Objects.requireNonNull(conf, "Freetext indexer failed to initialize temporal index, setConf() was not set."); + try { + docTableBw = mtbw.getBatchWriter(getFreeTextDocTablename(conf)); + termTableBw = mtbw.getBatchWriter(getFreeTextTermTablename(conf)); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + logger.error("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java index 042d938..6a78680 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -116,37 +117,56 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements public void init() { if (!isInit) { try { - initInternal(); + initReadWrite(); isInit = true; } catch (final AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException | RyaClientException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + logger.error("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); } } } - - private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + /** + * Initialize for writable use. + * This is called from the DAO, perhaps others. + */ + private void initReadWrite() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException, RyaClientException { if (mtbw == null) throw new RyaClientException("Failed to initialize temporal index, setMultiTableBatchWriter() was not set."); if (conf == null) throw new RyaClientException("Failed to initialize temporal index, setConf() was not set."); + if (temporalIndexTableName==null) + throw new RyaClientException("Failed to set temporalIndexTableName==null."); - temporalIndexTableName = getTableName(); + // Now do all the writable setup, read should already be complete. // Create one index table on first run. Boolean isCreated = ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName); if (isCreated) { logger.info("First run, created temporal index table: " + temporalIndexTableName); } temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName); - validPredicates = ConfigUtils.getTemporalPredicates(conf); } - //initialization occurs in setConf because index is created using reflection + /** + * Initialize everything for a query-only use. + * This is called from setConf, since that must be called by anyone. + * The DAO will also call setMultiTableBatchWriter() and init(). + */ + private void initReadOnly() { + if (conf == null) + throw new Error("Failed to initialize temporal index, setConf() was not set."); + temporalIndexTableName = getTableName(); + validPredicates = ConfigUtils.getTemporalPredicates(conf); + } + + /** + * Set the configuration, then initialize for read (query) use only. + * Readonly initialization occurs in setConf because it does not require setting a multitablebatchwriter (mtbw). + */ @Override public void setConf(final Configuration conf) { this.conf = conf; - + initReadOnly(); } @Override @@ -165,6 +185,7 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * T O D O parse an interval using multiple predicates for same subject -- ontology dependent. */ private void storeStatement(final Statement statement) throws IOException, IllegalArgumentException { + Objects.requireNonNull(temporalIndexBatchWriter,"This is not initialized for writing. Must call setMultiTableBatchWriter() and init()."); // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list final boolean isValidPredicate = validPredicates == null || validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); @@ -908,6 +929,8 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements } private void deleteStatement(final Statement statement) throws IOException, IllegalArgumentException { + Objects.requireNonNull(temporalIndexBatchWriter,"This is not initialized for writing. Must call setMultiTableBatchWriter() and init()."); + // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f7b2fd63/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java index cba550c..531085d 100644 --- a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java @@ -93,6 +93,8 @@ public class AccumuloFreeTextIndexerTest { public void testSearch() throws Exception { try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { f.setConf(conf); + f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf)); + f.init(); ValueFactory vf = new ValueFactoryImpl(); @@ -134,6 +136,8 @@ public class AccumuloFreeTextIndexerTest { public void testDelete() throws Exception { try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { f.setConf(conf); + f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf)); + f.init(); ValueFactory vf = new ValueFactoryImpl(); @@ -187,6 +191,8 @@ public class AccumuloFreeTextIndexerTest { try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { f.setConf(conf); + f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf)); + f.init(); // These should not be stored because they are not in the predicate list f.storeStatement(new RyaStatement(new RyaURI("foo:subj1"), new RyaURI(RDFS.LABEL.toString()), new RyaType("invalid"))); @@ -222,6 +228,8 @@ public class AccumuloFreeTextIndexerTest { public void testContextSearch() throws Exception { try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { f.setConf(conf); + f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf)); + f.init(); ValueFactory vf = new ValueFactoryImpl(); URI subject = new URIImpl("foo:subj");
