Repository: incubator-rya
Updated Branches:
  refs/heads/master 86c866eda -> 7949baa67 (forced update)


RYA-266 Added init() and other calls where ever the Accumulo temporal and 
freeText indexers are created and used for storing. Closes #149


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/7949baa6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/7949baa6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/7949baa6

Branch: refs/heads/master
Commit: 7949baa67c97f9432f8cac88e1533eb34d530801
Parents: c03c8bb
Author: David W. Lotts <[email protected]>
Authored: Fri Mar 31 14:05:07 2017 -0400
Committer: David Lotts <[email protected]>
Committed: Mon Sep 25 15:21:26 2017 -0400

----------------------------------------------------------------------
 .../freetext/AccumuloFreeTextIndexer.java       |  65 ++++++++---
 .../temporal/AccumuloTemporalIndexer.java       | 117 +++++++++++++------
 .../freetext/AccumuloFreeTextIndexerTest.java   |   8 ++
 .../temporal/AccumuloTemporalIndexerTest.java   |  32 +++--
 .../apache/rya/accumulo/mr/RyaOutputFormat.java |  30 ++++-
 .../rya/accumulo/mr/RyaOutputFormatTest.java    |  25 ++--
 6 files changed, 202 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
index 8c07a8c..9078015 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexer.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -213,7 +214,17 @@ public class AccumuloFreeTextIndexer extends 
AbstractAccumuloIndexer implements
 
     private boolean isInit = false;
 
-
+    /**
+     * Called by setConf to initialize query only.  
+     * Use this alone if usage does not require writing.
+     * For a writeable (store and delete) version of this, 
+     * call setconf() and then setMultiTableBatchWriter(), then call init()
+     * that is what the DAO does.
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     * @throws TableNotFoundException
+     * @throws TableExistsException
+     */
     private void initInternal() throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException,
             TableExistsException {
         final String doctable = getFreeTextDocTablename(conf);
@@ -260,19 +271,25 @@ public class AccumuloFreeTextIndexer extends 
AbstractAccumuloIndexer implements
             tableOps.setProperty(doctable, "table.bloom.enabled", 
Boolean.TRUE.toString());
         }
 
-        mtbw = ConfigUtils.createMultitableBatchWriter(conf);
-
-        docTableBw = mtbw.getBatchWriter(doctable);
-        termTableBw = mtbw.getBatchWriter(termtable);
-
+        // Set mtbw by calling setMultiTableBatchWriter().  The DAO does this 
and manages flushing.
+        // If you create it here, tests work, but a real Accumulo may lose 
writes due to unmanaged flushing.
+        if (mtbw != null) {
+               docTableBw = mtbw.getBatchWriter(doctable);
+               termTableBw = mtbw.getBatchWriter(termtable);
+        }
         tokenizer = ConfigUtils.getFreeTextTokenizer(conf);
         validPredicates = ConfigUtils.getFreeTextPredicates(conf);
 
         queryTermLimit = ConfigUtils.getFreeTextTermLimit(conf);
     }
 
-
-  //initialization occurs in setConf because index is created using reflection
+    /**
+     * setConf sets the configuration and then initializes for query only.  
+     * Use this alone if usage does not require writing.
+     * For a writeable (store and delete) version of this, 
+     * call this and then setMultiTableBatchWriter(), then call init()
+     * that is what the DAO does.
+     */
     @Override
     public void setConf(final Configuration conf) {
         this.conf = conf;
@@ -294,6 +311,8 @@ public class AccumuloFreeTextIndexer extends 
AbstractAccumuloIndexer implements
 
 
     private void storeStatement(final Statement statement) throws IOException {
+        Objects.requireNonNull(mtbw, "Freetext indexer attempting to store, 
but setMultiTableBatchWriter() was not set.");
+
         // if the predicate list is empty, accept all predicates.
         // Otherwise, make sure the predicate is on the "valid" list
         final boolean isValidPredicate = validPredicates.isEmpty() || 
validPredicates.contains(statement.getPredicate());
@@ -393,7 +412,8 @@ public class AccumuloFreeTextIndexer extends 
AbstractAccumuloIndexer implements
     @Override
     public void close() throws IOException {
         try {
-            mtbw.close();
+               if (mtbw!=null)
+                       mtbw.close();
         } catch (final MutationsRejectedException e) {
             logger.error("error closing the batch writer", e);
             throw new IOException(e);
@@ -636,7 +656,13 @@ public class AccumuloFreeTextIndexer extends 
AbstractAccumuloIndexer implements
         return makeFreeTextDocTablename( ConfigUtils.getTablePrefix(conf) );
     }
 
-    /**
+    @Override
+       public void setMultiTableBatchWriter(MultiTableBatchWriter writer) 
throws IOException {
+        mtbw = writer;
+       }
+
+
+       /**
      * Get the Term index's table name.
      *
      * @param conf - The Rya configuration that specifies which instance of Rya
@@ -684,6 +710,8 @@ public class AccumuloFreeTextIndexer extends 
AbstractAccumuloIndexer implements
     }
 
     private void deleteStatement(final Statement statement) throws IOException 
{
+        Objects.requireNonNull(mtbw, "Freetext indexer attempting to delete, 
but setMultiTableBatchWriter() was not set.");
+
         // if the predicate list is empty, accept all predicates.
         // Otherwise, make sure the predicate is on the "valid" list
         final boolean isValidPredicate = validPredicates.isEmpty() || 
validPredicates.contains(statement.getPredicate());
@@ -795,10 +823,21 @@ public class AccumuloFreeTextIndexer extends 
AbstractAccumuloIndexer implements
     }
 
 
-       @Override
+       /** 
+        * called by the DAO after setting the mtbw.
+        * The rest of the initilization is done by setConf()
+        */
+    @Override
        public void init() {
-               // TODO Auto-generated method stub
-
+        Objects.requireNonNull(mtbw, "Freetext indexer failed to initialize 
temporal index, setMultiTableBatchWriter() was not set.");
+        Objects.requireNonNull(conf, "Freetext indexer failed to initialize 
temporal index, setConf() was not set.");
+        try {
+                       docTableBw = 
mtbw.getBatchWriter(getFreeTextDocTablename(conf));
+                       termTableBw = 
mtbw.getBatchWriter(getFreeTextTermTablename(conf));
+               } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
+                       logger.error("Unable to initialize index.  Throwing 
Runtime Exception. ", e);
+            throw new RuntimeException(e);             
+        }
        }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
index fcc1c58..6a78680 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexer.java
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -56,6 +57,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer;
 import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.client.RyaClientException;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.api.resolver.RyaToRdfConversions;
 import org.apache.rya.indexing.KeyParts;
@@ -102,43 +104,70 @@ public class AccumuloTemporalIndexer extends 
AbstractAccumuloIndexer implements
     private boolean isInit = false;
 
 
-
-    private void initInternal() throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException,
-            TableExistsException {
-        temporalIndexTableName = getTableName();
-        // Create one index table on first run.
-        ConfigUtils.createTableIfNotExists(conf, temporalIndexTableName);
-
-        mtbw = ConfigUtils.createMultitableBatchWriter(conf);
-
-        temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName);
-
-        validPredicates = ConfigUtils.getTemporalPredicates(conf);
-    }
-
-    //initialization occurs in setConf because index is created using 
reflection
+    /**
+     * intilize the temporal index.
+     * This is dependent on a few set method calls before init:
+     * >  Connector = ConfigUtils.getConnector(conf);
+     * >  MultiTableBatchWriter mtbw = 
connector.createMultiTableBatchWriter(new BatchWriterConfig());
+     * >  // optional: temporal.setConnector(connector);
+     * >  temporal.setMultiTableBatchWriter(mtbw);
+     * >  temporal.init();
+     */
     @Override
-    public void setConf(final Configuration conf) {
-        this.conf = conf;
+    public void init() {
         if (!isInit) {
             try {
-                initInternal();
+                initReadWrite();
                 isInit = true;
-            } catch (final AccumuloException e) {
-                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
-                throw new RuntimeException(e);
-            } catch (final AccumuloSecurityException e) {
-                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
-                throw new RuntimeException(e);
-            } catch (final TableNotFoundException e) {
-                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
-                throw new RuntimeException(e);
-            } catch (final TableExistsException e) {
-                logger.warn("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
+            } catch (final AccumuloException | AccumuloSecurityException | 
TableNotFoundException | TableExistsException | RyaClientException e) {
+                logger.error("Unable to initialize index.  Throwing Runtime 
Exception. ", e);
                 throw new RuntimeException(e);
             }
         }
     }
+    /**
+     * Initialize for writable use.  
+     * This is called from the DAO, perhaps others.
+     */
+    private void initReadWrite() throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException,
+                    TableExistsException, RyaClientException {
+        if (mtbw == null)
+            throw new RyaClientException("Failed to initialize temporal index, 
setMultiTableBatchWriter() was not set.");
+        if (conf == null)
+            throw new RyaClientException("Failed to initialize temporal index, 
setConf() was not set.");
+        if (temporalIndexTableName==null)
+            throw new RyaClientException("Failed to set 
temporalIndexTableName==null.");
+
+        // Now do all the writable setup, read should already be complete.
+        // Create one index table on first run.
+        Boolean isCreated = ConfigUtils.createTableIfNotExists(conf, 
temporalIndexTableName);
+        if (isCreated) {
+            logger.info("First run, created temporal index table: " + 
temporalIndexTableName);
+        }
+        temporalIndexBatchWriter = mtbw.getBatchWriter(temporalIndexTableName);
+    }
+
+    /**
+     * Initialize everything for a query-only use.  
+     * This is called from setConf, since that must be called by anyone.
+     * The DAO will also call setMultiTableBatchWriter() and init().
+     */
+       private void initReadOnly()  {
+               if (conf == null)
+                       throw new Error("Failed to initialize temporal index, 
setConf() was not set.");
+               temporalIndexTableName = getTableName();
+               validPredicates = ConfigUtils.getTemporalPredicates(conf);
+       }
+
+    /**
+     * Set the configuration, then initialize for read (query) use only.
+     * Readonly initialization occurs in setConf because it does not require 
setting a multitablebatchwriter (mtbw).
+     */
+    @Override
+    public void setConf(final Configuration conf) {
+        this.conf = conf;
+                       initReadOnly();
+    }
 
     @Override
     public Configuration getConf() {
@@ -156,18 +185,23 @@ public class AccumuloTemporalIndexer extends 
AbstractAccumuloIndexer implements
      * T O D O parse an interval using multiple predicates for same subject -- 
ontology dependent.
      */
     private void storeStatement(final Statement statement) throws IOException, 
IllegalArgumentException {
+       Objects.requireNonNull(temporalIndexBatchWriter,"This is not 
initialized for writing.  Must call setMultiTableBatchWriter() and init().");
         // if the predicate list is empty, accept all predicates.
         // Otherwise, make sure the predicate is on the "valid" list
-        final boolean isValidPredicate = validPredicates.isEmpty() || 
validPredicates.contains(statement.getPredicate());
+        final boolean isValidPredicate = validPredicates == null || 
validPredicates.isEmpty() || validPredicates.contains(statement.getPredicate());
         if (!isValidPredicate || !(statement.getObject() instanceof Literal)) {
             return;
         }
+
         final DateTime[] indexDateTimes = new DateTime[2]; // 0 begin, 1 end 
of interval
         extractDateTime(statement, indexDateTimes);
         if (indexDateTimes[0]==null) {
             return;
         }
 
+        if (!this.isInit)
+            throw new RuntimeException("Method .init() was not called (or 
failed) before attempting to store statements.");
+
         // Add this as an instant, or interval.
         try {
             if (indexDateTimes[1] != null) {
@@ -865,8 +899,9 @@ public class AccumuloTemporalIndexer extends 
AbstractAccumuloIndexer implements
     @Override
     public void close() throws IOException {
         try {
-
-            mtbw.close();
+            if (mtbw != null) {
+                mtbw.close();
+            }
 
         } catch (final MutationsRejectedException e) {
             final String msg = "Error while closing the batch writer.";
@@ -894,6 +929,8 @@ public class AccumuloTemporalIndexer extends 
AbstractAccumuloIndexer implements
     }
 
     private void deleteStatement(final Statement statement) throws 
IOException, IllegalArgumentException {
+       Objects.requireNonNull(temporalIndexBatchWriter,"This is not 
initialized for writing.  Must call setMultiTableBatchWriter() and init().");
+
         // if the predicate list is empty, accept all predicates.
         // Otherwise, make sure the predicate is on the "valid" list
         final boolean isValidPredicate = validPredicates.isEmpty() || 
validPredicates.contains(statement.getPredicate());
@@ -926,12 +963,6 @@ public class AccumuloTemporalIndexer extends 
AbstractAccumuloIndexer implements
     }
 
     @Override
-    public void init() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
     public void setConnector(final Connector connector) {
         // TODO Auto-generated method stub
 
@@ -954,4 +985,14 @@ public class AccumuloTemporalIndexer extends 
AbstractAccumuloIndexer implements
         // TODO Auto-generated method stub
 
     }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see 
org.apache.rya.accumulo.experimental.AbstractAccumuloIndexer#setMultiTableBatchWriter(org.apache.accumulo.core.client.MultiTableBatchWriter)
+     */
+    @Override
+    public void setMultiTableBatchWriter(MultiTableBatchWriter writer) throws 
IOException {
+        mtbw = writer;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
 
b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
index cba550c..531085d 100644
--- 
a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
+++ 
b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/freetext/AccumuloFreeTextIndexerTest.java
@@ -93,6 +93,8 @@ public class AccumuloFreeTextIndexerTest {
     public void testSearch() throws Exception {
         try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
             f.setConf(conf);
+            
f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+            f.init();
 
             ValueFactory vf = new ValueFactoryImpl();
 
@@ -134,6 +136,8 @@ public class AccumuloFreeTextIndexerTest {
     public void testDelete() throws Exception {
         try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
             f.setConf(conf);
+            
f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+            f.init();
 
             ValueFactory vf = new ValueFactoryImpl();
 
@@ -187,6 +191,8 @@ public class AccumuloFreeTextIndexerTest {
 
         try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
             f.setConf(conf);
+            
f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+            f.init();
 
             // These should not be stored because they are not in the 
predicate list
             f.storeStatement(new RyaStatement(new RyaURI("foo:subj1"), new 
RyaURI(RDFS.LABEL.toString()), new RyaType("invalid")));
@@ -222,6 +228,8 @@ public class AccumuloFreeTextIndexerTest {
     public void testContextSearch() throws Exception {
         try (AccumuloFreeTextIndexer f = new AccumuloFreeTextIndexer()) {
             f.setConf(conf);
+            
f.setMultiTableBatchWriter(ConfigUtils.createMultitableBatchWriter(conf));
+            f.init();
 
             ValueFactory vf = new ValueFactoryImpl();
             URI subject = new URIImpl("foo:subj");

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
 
b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
index 80824b8..839ef6a 100644
--- 
a/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
+++ 
b/extras/indexing/src/test/java/org/apache/rya/indexing/accumulo/temporal/AccumuloTemporalIndexerTest.java
@@ -21,8 +21,10 @@ package org.apache.rya.indexing.accumulo.temporal;
 
 
 import static org.apache.rya.api.resolver.RdfToRyaConversions.convertStatement;
-import static org.junit.Assert.*; 
-import org.junit.Assert; 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.io.PrintStream;
 import java.security.NoSuchAlgorithmException;
@@ -35,8 +37,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -48,8 +54,17 @@ import org.apache.commons.codec.binary.StringUtils;
 import org.apache.commons.io.output.NullOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.indexing.StatementConstraints;
+import org.apache.rya.indexing.StatementSerializer;
+import org.apache.rya.indexing.TemporalInstant;
+import org.apache.rya.indexing.TemporalInstantRfc3339;
+import org.apache.rya.indexing.TemporalInterval;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -64,14 +79,6 @@ import org.openrdf.query.QueryEvaluationException;
 import com.beust.jcommander.internal.Lists;
 
 import info.aduna.iteration.CloseableIteration;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.indexing.StatementConstraints;
-import org.apache.rya.indexing.StatementSerializer;
-import org.apache.rya.indexing.TemporalInstant;
-import org.apache.rya.indexing.TemporalInstantRfc3339;
-import org.apache.rya.indexing.TemporalInterval;
-import org.apache.rya.indexing.accumulo.ConfigUtils;
 
 /**
  * JUnit tests for TemporalIndexer and it's implementation 
AccumuloTemporalIndexer
@@ -230,6 +237,11 @@ public final class AccumuloTemporalIndexerTest {
 
         tIndexer = new AccumuloTemporalIndexer();
         tIndexer.setConf(conf);
+        Connector connector = ConfigUtils.getConnector(conf);
+        MultiTableBatchWriter mt_bw = 
connector.createMultiTableBatchWriter(new BatchWriterConfig());
+        tIndexer.setConnector(connector);
+        tIndexer.setMultiTableBatchWriter(mt_bw);
+        tIndexer.init();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java 
b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
index 155d694..5332260 100644
--- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaOutputFormat.java
@@ -45,9 +45,6 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.log4j.Logger;
-import org.openrdf.model.Statement;
-import org.openrdf.model.vocabulary.XMLSchema;
-
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.AccumuloRdfConstants;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
@@ -64,6 +61,8 @@ import org.apache.rya.indexing.accumulo.ConfigUtils;
 import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
 import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
 import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.openrdf.model.Statement;
+import org.openrdf.model.vocabulary.XMLSchema;
 
 /**
  * {@link OutputFormat} that uses Rya, the {@link GeoIndexer}, the
@@ -207,21 +206,42 @@ public class RyaOutputFormat extends 
OutputFormat<Writable, RyaStatementWritable
     }
 
 
-    private static FreeTextIndexer getFreeTextIndexer(Configuration conf) {
+    private static FreeTextIndexer getFreeTextIndexer(Configuration conf) 
throws IOException {
         if (!conf.getBoolean(ENABLE_FREETEXT, true)) {
             return null;
         }
         AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer();
         freeText.setConf(conf);
+        Connector connector;
+        try {
+            connector = ConfigUtils.getConnector(conf);
+        } catch (AccumuloException | AccumuloSecurityException e) {
+            throw new IOException("Error when attempting to create a 
connection for writing the freeText index.", e);
+        }
+        MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new 
BatchWriterConfig());
+        freeText.setConnector(connector);
+        freeText.setMultiTableBatchWriter(mtbw);
+        freeText.init();
+
         return freeText;
     }
 
-    private static TemporalIndexer getTemporalIndexer(Configuration conf) {
+    private static TemporalIndexer getTemporalIndexer(Configuration conf) 
throws IOException {
         if (!conf.getBoolean(ENABLE_TEMPORAL, true)) {
             return null;
         }
         AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
         temporal.setConf(conf);
+        Connector connector;
+        try {
+            connector = ConfigUtils.getConnector(conf);
+        } catch (AccumuloException | AccumuloSecurityException e) {
+            throw new IOException("Error when attempting to create a 
connection for writing the temporal index.", e);
+        }
+        MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new 
BatchWriterConfig());
+        temporal.setConnector(connector);
+        temporal.setMultiTableBatchWriter(mtbw);
+        temporal.init();
         return temporal;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/7949baa6/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java 
b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java
index 0b44a92..96f57f6 100644
--- 
a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java
+++ 
b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaOutputFormatTest.java
@@ -5,7 +5,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.client.mock.MockInstance;
@@ -17,15 +19,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.openrdf.model.Statement;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-import org.openrdf.model.vocabulary.XMLSchema;
-
-import info.aduna.iteration.CloseableIteration;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.AccumuloRyaDAO;
 import org.apache.rya.api.domain.RyaStatement;
@@ -43,6 +36,15 @@ import 
org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
 import org.apache.rya.indexing.accumulo.freetext.SimpleTokenizer;
 import org.apache.rya.indexing.accumulo.freetext.Tokenizer;
 import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import info.aduna.iteration.CloseableIteration;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -235,6 +237,11 @@ public class RyaOutputFormatTest {
         }
         final AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
         temporal.setConf(conf);
+        Connector connector = ConfigUtils.getConnector(conf);
+        MultiTableBatchWriter mtbw = connector.createMultiTableBatchWriter(new 
BatchWriterConfig());
+        temporal.setConnector(connector);
+        temporal.setMultiTableBatchWriter(mtbw);
+        temporal.init();
         final Set<Statement> empty = new HashSet<>();
         final Set<Statement> head = new HashSet<>();
         final Set<Statement> tail = new HashSet<>();

Reply via email to