Repository: incubator-rya Updated Branches: refs/heads/master 86c866eda -> 7949baa67 (forced update)
RYA-266 Added init() and other calls where ever the Accumulo temporal and freeText indexers are created and used for storing. Closes #149 Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/7949baa6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/7949baa6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/7949baa6 Branch: refs/heads/master Commit: 7949baa67c97f9432f8cac88e1533eb34d530801 Parents: c03c8bb Author: David W. Lotts <[email protected]> Authored: Fri Mar 31 14:05:07 2017 -0400 Committer: David Lotts <[email protected]> Committed: Mon Sep 25 15:21:26 2017 -0400 ---------------------------------------------------------------------- .../freetext/AccumuloFreeTextIndexer.java | 65 ++++++++--- .../temporal/AccumuloTemporalIndexer.java | 117 +++++++++++++------ .../freetext/AccumuloFreeTextIndexerTest.java | 8 ++ .../temporal/AccumuloTemporalIndexerTest.java | 32 +++-- .../apache/rya/accumulo/mr/RyaOutputFormat.java | 30 ++++- .../rya/accumulo/mr/RyaOutputFormatTest.java | 25 ++-- 6 files changed, 202 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java index 8c07a8c..9078015 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -213,7 +214,17 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements private boolean isInit = false; - + /** + * Called by setConf to initialize query only. + * Use this alone if usage does not require writing. + * For a writeable (store and delete) version of this, + * call setconf() and then setMultiTableBatchWriter(), then call init() + * that is what the DAO does. + * @throws AccumuloException + * @throws AccumuloSecurityException + * @throws TableNotFoundException + * @throws TableExistsException + */ private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, TableExistsException { final String doctable = getFreeTextDocTablename(conf); @@ -260,19 +271,25 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements tableOps.setProperty(doctable, "table.bloom.enabled", Boolean.TRUE.toString()); } - mtbw = ConfigUtils.createMultitableBatchWriter(conf); - - docTableBw = mtbw.getBatchWriter(doctable); - termTableBw = mtbw.getBatchWriter(termtable); - + // Set mtbw by calling setMultiTableBatchWriter(). The DAO does this and manages flushing. + // If you create it here, tests work, but a real Accumulo may lose writes due to unmanaged flushing. + if (mtbw != null) { + docTableBw = mtbw.getBatchWriter(doctable); + termTableBw = mtbw.getBatchWriter(termtable); + } tokenizer = ConfigUtils.getFreeTextTokenizer(conf); validPredicates = ConfigUtils.getFreeTextPredicates(conf); queryTermLimit = ConfigUtils.getFreeTextTermLimit(conf); } - - //initialization occurs in setConf because index is created using reflection + /** + * setConf sets the configuration and then initializes for query only. + * Use this alone if usage does not require writing. + * For a writeable (store and delete) version of this, + * call this and then setMultiTableBatchWriter(), then call init() + * that is what the DAO does. + */ @Override public void setConf(final Configuration conf) { this.conf = conf; @@ -294,6 +311,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements private void storeStatement(final Statement statement) throws IOException { + Objects.requireNonNull(mtbw, "Freetext indexer attempting to store, but setMultiTableBatchWriter() was not set."); + // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); @@ -393,7 +412,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements @Override public void close() throws IOException { try { - mtbw.close(); + if (mtbw!=null) + mtbw.close(); } catch (final MutationsRejectedException e) { logger.error("error closing the batch writer", e); throw new IOException(e); @@ -636,7 +656,13 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements return makeFreeTextDocTablename( ConfigUtils.getTablePrefix(conf) ); } - /** + @Override + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { + mtbw = writer; + } + + + /** * Get the Term index's table name. * * @param conf - The Rya configuration that specifies which instance of Rya @@ -684,6 +710,8 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements } private void deleteStatement(final Statement statement) throws IOException { + Objects.requireNonNull(mtbw, "Freetext indexer attempting to delete, but setMultiTableBatchWriter() was not set."); + // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); @@ -795,10 +823,21 @@ public class AccumuloFreeTextIndexer extends AbstractAccumuloIndexer implements } - @Override + /** + * called by the DAO after setting the mtbw. + * The rest of the initilization is done by setConf() + */ + @Override public void init() { - // TODO Auto-generated method stub - + Objects.requireNonNull(mtbw, "Freetext indexer failed to initialize temporal index, setMultiTableBatchWriter() was not set."); + Objects.requireNonNull(conf, "Freetext indexer failed to initialize temporal index, setConf() was not set."); + try { + docTableBw = mtbw.getBatchWriter(getFreeTextDocTablename(conf)); + termTableBw = mtbw.getBatchWriter(getFreeTextTermTablename(conf)); + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + logger.error("Unable to initialize index. Throwing Runtime Exception. ", e); + throw new RuntimeException(e); + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java index fcc1c58..6a78680 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -56,6 +57,7 @@ import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer; import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.client.RyaClientException; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.resolver.RyaToRdfConversions; import org.apache.rya.indexing.KeyParts; @@ -102,43 +104,70 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements private boolean isInit = false; - - private void initInternal() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, - TableExistsException { - temporalIndexTableName = getTableName(); - // Create one index table on first run. - ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName); - - mtbw = ConfigUtils.createMultitableBatchWriter(conf); - - temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName); - - validPredicates = ConfigUtils.getTemporalPredicates(conf); - } - - //initialization occurs in setConf because index is created using reflection + /** + * intilize the temporal index. + * This is dependent on a few set method calls before init: + * > Connector = ConfigUtils.getConnector(conf); + * > MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + * > // optional: temporal.setConnector(connector); + * > temporal.setMultiTableBatchWriter(mtbw); + * > temporal.init(); + */ @Override - public void setConf(final Configuration conf) { - this.conf = conf; + public void init() { if (!isInit) { try { - initInternal(); + initReadWrite(); isInit = true; - } catch (final AccumuloException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final AccumuloSecurityException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final TableNotFoundException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); - throw new RuntimeException(e); - } catch (final TableExistsException e) { - logger.warn("Unable to initialize index. Throwing Runtime Exception. ", e); + } catch (final AccumuloException | AccumuloSecurityException | TableNotFoundException | TableExistsException | RyaClientException e) { + logger.error("Unable to initialize index. Throwing Runtime Exception. ", e); throw new RuntimeException(e); } } } + /** + * Initialize for writable use. + * This is called from the DAO, perhaps others. + */ + private void initReadWrite() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, + TableExistsException, RyaClientException { + if (mtbw == null) + throw new RyaClientException("Failed to initialize temporal index, setMultiTableBatchWriter() was not set."); + if (conf == null) + throw new RyaClientException("Failed to initialize temporal index, setConf() was not set."); + if (temporalIndexTableName==null) + throw new RyaClientException("Failed to set temporalIndexTableName==null."); + + // Now do all the writable setup, read should already be complete. + // Create one index table on first run. + Boolean isCreated = ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName); + if (isCreated) { + logger.info("First run, created temporal index table: " + temporalIndexTableName); + } + temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName); + } + + /** + * Initialize everything for a query-only use. + * This is called from setConf, since that must be called by anyone. + * The DAO will also call setMultiTableBatchWriter() and init(). + */ + private void initReadOnly() { + if (conf == null) + throw new Error("Failed to initialize temporal index, setConf() was not set."); + temporalIndexTableName = getTableName(); + validPredicates = ConfigUtils.getTemporalPredicates(conf); + } + + /** + * Set the configuration, then initialize for read (query) use only. + * Readonly initialization occurs in setConf because it does not require setting a multitablebatchwriter (mtbw). + */ + @Override + public void setConf(final Configuration conf) { + this.conf = conf; + initReadOnly(); + } @Override public Configuration getConf() { @@ -156,18 +185,23 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements * T O D O parse an interval using multiple predicates for same subject -- ontology dependent. */ private void storeStatement(final Statement statement) throws IOException, IllegalArgumentException { + Objects.requireNonNull(temporalIndexBatchWriter,"This is not initialized for writing. Must call setMultiTableBatchWriter() and init()."); // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list - final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); + final boolean isValidPredicate = validPredicates == null || validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); if (!isValidPredicate || !(statement.getObject() instanceof Literal)) { return; } + final DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end of interval extractDateTime(statement, indexDateTimes); if (indexDateTimes[0]==null) { return; } + if (!this.isInit) + throw new RuntimeException("Method .init() was not called (or failed) before attempting to store statements."); + // Add this as an instant, or interval. try { if (indexDateTimes[1] != null) { @@ -865,8 +899,9 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements @Override public void close() throws IOException { try { - - mtbw.close(); + if (mtbw != null) { + mtbw.close(); + } } catch (final MutationsRejectedException e) { final String msg = "Error while closing the batch writer."; @@ -894,6 +929,8 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements } private void deleteStatement(final Statement statement) throws IOException, IllegalArgumentException { + Objects.requireNonNull(temporalIndexBatchWriter,"This is not initialized for writing. Must call setMultiTableBatchWriter() and init()."); + // if the predicate list is empty, accept all predicates. // Otherwise, make sure the predicate is on the "valid" list final boolean isValidPredicate = validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate()); @@ -926,12 +963,6 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements } @Override - public void init() { - // TODO Auto-generated method stub - - } - - @Override public void setConnector(final Connector connector) { // TODO Auto-generated method stub @@ -954,4 +985,14 @@ public class AccumuloTemporalIndexer extends AbstractAccumuloIndexer implements // TODO Auto-generated method stub } + + /* + * (non-Javadoc) + * + * @see org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer#setMultiTableBatchWriter(org.apache.accumulo.core.client.MultiTableBatchWriter) + */ + @Override + public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws IOException { + mtbw = writer; + } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java index cba550c..531085d 100644 --- a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java @@ -93,6 +93,8 @@ public class AccumuloFreeTextIndexerTest { public void testSearch() throws Exception { try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { f.setConf(conf); + f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf)); + f.init(); ValueFactory vf = new ValueFactoryImpl(); @@ -134,6 +136,8 @@ public class AccumuloFreeTextIndexerTest { public void testDelete() throws Exception { try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { f.setConf(conf); + f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf)); + f.init(); ValueFactory vf = new ValueFactoryImpl(); @@ -187,6 +191,8 @@ public class AccumuloFreeTextIndexerTest { try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { f.setConf(conf); + f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf)); + f.init(); // These should not be stored because they are not in the predicate list f.storeStatement(new RyaStatement(new RyaURI("foo:subj1"), new RyaURI(RDFS.LABEL.toString()), new RyaType("invalid"))); @@ -222,6 +228,8 @@ public class AccumuloFreeTextIndexerTest { public void testContextSearch() throws Exception { try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) { f.setConf(conf); + f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf)); + f.init(); ValueFactory vf = new ValueFactoryImpl(); URI subject = new URIImpl("foo:subj"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java index 80824b8..839ef6a 100644 --- a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java @@ -21,8 +21,10 @@ package org.apache.rya.indexing.accumulo.temporal; import static org.apache.rya.api.resolver.RdfToRyaConversions.convertStatement; -import static org.junit.Assert.*; -import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.io.PrintStream; import java.security.NoSuchAlgorithmException; @@ -35,8 +37,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; + import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.TableNotFoundException; @@ -48,8 +54,17 @@ import org.apache.commons.codec.binary.StringUtils; import org.apache.commons.io.output.NullOutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.indexing.StatementConstraints; +import org.apache.rya.indexing.StatementSerializer; +import org.apache.rya.indexing.TemporalInstant; +import org.apache.rya.indexing.TemporalInstantRfc3339; +import org.apache.rya.indexing.TemporalInterval; +import org.apache.rya.indexing.accumulo.ConfigUtils; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -64,14 +79,6 @@ import org.openrdf.query.QueryEvaluationException; import com.beust.jcommander.internal.Lists; import info.aduna.iteration.CloseableIteration; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.indexing.StatementConstraints; -import org.apache.rya.indexing.StatementSerializer; -import org.apache.rya.indexing.TemporalInstant; -import org.apache.rya.indexing.TemporalInstantRfc3339; -import org.apache.rya.indexing.TemporalInterval; -import org.apache.rya.indexing.accumulo.ConfigUtils; /** * JUnit tests for TemporalIndexer and it's implementation AccumuloTemporalIndexer @@ -230,6 +237,11 @@ public final class AccumuloTemporalIndexerTest { tIndexer = new AccumuloTemporalIndexer(); tIndexer.setConf(conf); + Connector connector = ConfigUtils.getConnector(conf); + MultiTableBatchWriter mt_bw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + tIndexer.setConnector(connector); + tIndexer.setMultiTableBatchWriter(mt_bw); + tIndexer.init(); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java index 155d694..5332260 100644 --- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java @@ -45,9 +45,6 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.log4j.Logger; -import org.openrdf.model.Statement; -import org.openrdf.model.vocabulary.XMLSchema; - import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRdfConstants; import org.apache.rya.accumulo.AccumuloRyaDAO; @@ -64,6 +61,8 @@ import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import org.openrdf.model.Statement; +import org.openrdf.model.vocabulary.XMLSchema; /** * {@link OutputFormat} that uses Rya, the {@link GeoIndexer}, the @@ -207,21 +206,42 @@ public class RyaOutputFormat extends OutputFormat<Writable, RyaStatementWritable } - private static FreeTextIndexer getFreeTextIndexer(Configuration conf) { + private static FreeTextIndexer getFreeTextIndexer(Configuration conf) throws IOException { if (!conf.getBoolean(ENABLE_FREETEXT, true)) { return null; } AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer(); freeText.setConf(conf); + Connector connector; + try { + connector = ConfigUtils.getConnector(conf); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Error when attempting to create a connection for writing the freeText index.", e); + } + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + freeText.setConnector(connector); + freeText.setMultiTableBatchWriter(mtbw); + freeText.init(); + return freeText; } - private static TemporalIndexer getTemporalIndexer(Configuration conf) { + private static TemporalIndexer getTemporalIndexer(Configuration conf) throws IOException { if (!conf.getBoolean(ENABLE_TEMPORAL, true)) { return null; } AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); temporal.setConf(conf); + Connector connector; + try { + connector = ConfigUtils.getConnector(conf); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Error when attempting to create a connection for writing the temporal index.", e); + } + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + temporal.setConnector(connector); + temporal.setMultiTableBatchWriter(mtbw); + temporal.init(); return temporal; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java index 0b44a92..96f57f6 100644 --- a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java +++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java @@ -5,7 +5,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; import org.apache.accumulo.core.client.mock.MockInstance; @@ -17,15 +19,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordWriter; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.openrdf.model.Statement; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.model.vocabulary.XMLSchema; - -import info.aduna.iteration.CloseableIteration; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.api.domain.RyaStatement; @@ -43,6 +36,15 @@ import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; import org.apache.rya.indexing.accumulo.freetext.SimpleTokenizer; import org.apache.rya.indexing.accumulo.freetext.Tokenizer; import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; + +import info.aduna.iteration.CloseableIteration; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -235,6 +237,11 @@ public class RyaOutputFormatTest { } final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer(); temporal.setConf(conf); + Connector connector = ConfigUtils.getConnector(conf); + MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new BatchWriterConfig()); + temporal.setConnector(connector); + temporal.setMultiTableBatchWriter(mtbw); + temporal.init(); final Set<Statement> empty = new HashSet<>(); final Set<Statement> head = new HashSet<>(); final Set<Statement> tail = new HashSet<>();
