http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
new file mode 100644
index 0000000..88f9030
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
@@ -0,0 +1,305 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.Tool;
+import org.openrdf.rio.RDFFormat;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base class for MapReduce tools that interact with Accumulo-backed Rya. Holds
+ * a {@link Configuration} to keep track of connection parameters.
+ * <p>
+ * Can be configured to read input from Rya, either as
+ * {@link RyaStatementWritable}s or as Accumulo rows, or to read statements 
from
+ * RDF files.
+ * <p>
+ * Can be configured to send output either by inserting RyaStatementWritables 
to
+ * a Rya instance, or by writing arbitrary
+ * {@link org.apache.accumulo.core.data.Mutation}s directly to Accumulo tables.
+ */
+public abstract class AbstractAccumuloMRTool implements Tool {
+    static int DEFAULT_IO_SORT_MB = 256;
+
+    protected Configuration conf;
+
+    // Connection parameters
+    protected String zk;
+    protected String instance;
+    protected String userName;
+    protected String pwd;
+    protected Authorizations authorizations;
+    protected boolean mock = false;
+    protected boolean hdfsInput = false;
+    protected String ttl;
+    protected String tablePrefix;
+    protected TABLE_LAYOUT rdfTableLayout;
+
+    /**
+     * Gets the Configuration containing any relevant options.
+     * @return This Tool's Configuration object.
+     */
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    /**
+     * Set this Tool's Configuration.
+     */
+    @Override
+    public void setConf(Configuration configuration) {
+        this.conf = configuration;
+    }
+
+    /**
+     * Initializes configuration parameters, checking that required parameters
+     * are found and ensuring that options corresponding to multiple property
+     * names are set consistently. Requires at least that the username,
+     * password, and instance name are all configured. Zookeeper hosts must be
+     * configured if not using a mock instance. Table prefix, if not provided,
+     * will be set to {@link RdfCloudTripleStoreConstants#TBL_PRFX_DEF}. Should
+     * be called before configuring input/output. See {@link MRUtils} for
+     * configuration properties.
+     */
+    protected void init() {
+        // Load configuration parameters
+        zk = MRUtils.getACZK(conf);
+        instance = MRUtils.getACInstance(conf);
+        userName = MRUtils.getACUserName(conf);
+        pwd = MRUtils.getACPwd(conf);
+        mock = MRUtils.getACMock(conf, false);
+        ttl = MRUtils.getACTtl(conf);
+        tablePrefix = MRUtils.getTablePrefix(conf);
+        rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.OSP);
+        hdfsInput = conf.getBoolean(MRUtils.AC_HDFS_INPUT_PROP, false);
+        // Set authorizations if specified
+        String authString = conf.get(MRUtils.AC_AUTH_PROP);
+        if (authString != null && !authString.isEmpty()) {
+            authorizations = new Authorizations(authString.split(","));
+            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for 
consistency
+        }
+        else {
+            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        // Set table prefix to the default if not set
+        if (tablePrefix == null) {
+            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+            MRUtils.setTablePrefix(conf, tablePrefix);
+        }
+        // Check for required configuration parameters
+        Preconditions.checkNotNull(instance, "Accumulo instance name [" + 
MRUtils.AC_INSTANCE_PROP + "] not set.");
+        Preconditions.checkNotNull(userName, "Accumulo username [" + 
MRUtils.AC_USERNAME_PROP + "] not set.");
+        Preconditions.checkNotNull(pwd, "Accumulo password [" + 
MRUtils.AC_PWD_PROP + "] not set.");
+        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + 
MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        // If connecting to real accumulo, set additional parameters and 
require zookeepers
+        if (!mock) {
+            Preconditions.checkNotNull(zk, "Zookeeper hosts not set (" + 
MRUtils.AC_ZK_PROP + ")");
+            conf.setBoolean("mapred.map.tasks.speculative.execution", false);
+            conf.setBoolean("mapred.reduce.tasks.speculative.execution", 
false);
+            if (conf.get(MRUtils.HADOOP_IO_SORT_MB) == null) {
+                conf.setInt(MRUtils.HADOOP_IO_SORT_MB, DEFAULT_IO_SORT_MB);
+            }
+            conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency
+        }
+        // Ensure consistency between alternative configuration properties
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
tablePrefix);
+    }
+
+    /**
+     * Sets up Accumulo input for a job: the job receives
+     * ({@link org.apache.accumulo.core.data.Key},
+     * {@link org.apache.accumulo.core.data.Value}) pairs from the table
+     * specified by the configuration (using
+     * {@link MRUtils#TABLE_PREFIX_PROPERTY} and
+     * {@link MRUtils#TABLE_LAYOUT_PROP}).
+     * @param   job     MapReduce Job to configure
+     * @throws  AccumuloSecurityException if connecting to Accumulo with the
+     *          given username and password fails.
+     */
+    protected void setupAccumuloInput(Job job) throws 
AccumuloSecurityException {
+        // set up accumulo input
+        if (!hdfsInput) {
+            job.setInputFormatClass(AccumuloInputFormat.class);
+        } else {
+            job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
+        }
+        AccumuloInputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+        String tableName = 
RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix);
+        AccumuloInputFormat.setInputTableName(job, tableName);
+        AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+        if (mock) {
+            AccumuloInputFormat.setMockInstance(job, instance);
+        } else {
+            ClientConfiguration clientConfig = 
ClientConfiguration.loadDefault()
+                    .withInstance(instance).withZkHosts(zk);
+            AccumuloInputFormat.setZooKeeperInstance(job, clientConfig);
+        }
+        if (ttl != null) {
+            IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class.getName());
+            AgeOffFilter.setTTL(setting, Long.valueOf(ttl));
+            AccumuloInputFormat.addIterator(job, setting);
+        }
+    }
+
+    /**
+     * Sets up Rya input for a job: the job receives
+     * ({@link org.apache.hadoop.io.LongWritable}, {@link 
RyaStatementWritable})
+     * pairs from a Rya instance. Uses the same configuration properties to
+     * connect as direct Accumulo input, but returns statement data instead of
+     * row data.
+     * @param   job     Job to configure
+     * @throws  AccumuloSecurityException if connecting to Accumulo with the
+     *          given username and password fails.
+     */
+    protected void setupRyaInput(Job job) throws AccumuloSecurityException {
+        setupAccumuloInput(job);
+        job.setInputFormatClass(RyaInputFormat.class);
+    }
+
+    /**
+     * Sets up RDF file input for a job: the job receives
+     * ({@link org.apache.hadoop.io.LongWritable}, {@link 
RyaStatementWritable})
+     * pairs from RDF file(s) found at the specified path.
+     * @param   job   Job to configure
+     * @param   inputPath     File or directory name
+     * @param   defaultFormat  Default RDF serialization format, can be
+     *                         overridden by {@link MRUtils#FORMAT_PROP}
+     * @throws  IOException if there's an error interacting with the
+     *          {@link org.apache.hadoop.fs.FileSystem}.
+     */
+    protected void setupFileInput(Job job, String inputPath, RDFFormat 
defaultFormat) throws IOException {
+        RDFFormat format = MRUtils.getRDFFormat(conf);
+        if (format == null) {
+            format = defaultFormat;
+        }
+        RdfFileInputFormat.addInputPath(job, new Path(inputPath));
+        RdfFileInputFormat.setRDFFormat(job, format);
+        job.setInputFormatClass(RdfFileInputFormat.class);
+    }
+
+    /**
+     * Sets up Accumulo output for a job: allows the job to write (String,
+     * Mutation) pairs, where the Mutation will be written to the table named 
by
+     * the String.
+     * @param   job Job to configure
+     * @param   outputTable Default table to send output to
+     * @throws  AccumuloSecurityException if connecting to Accumulo with the
+     *          given username and password fails
+     */
+    protected void setupAccumuloOutput(Job job, String outputTable) throws 
AccumuloSecurityException {
+        AccumuloOutputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+        AccumuloOutputFormat.setCreateTables(job, true);
+        AccumuloOutputFormat.setDefaultTableName(job, outputTable);
+        if (mock) {
+            AccumuloOutputFormat.setMockInstance(job, instance);
+        } else {
+            ClientConfiguration clientConfig = 
ClientConfiguration.loadDefault()
+                    .withInstance(instance).withZkHosts(zk);
+            AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig);
+        }
+        job.setOutputFormatClass(AccumuloOutputFormat.class);
+    }
+
+    /**
+     * Sets up Rya output for a job: allows the job to write
+     * {@link RyaStatementWritable} data, which will in turn be input into the
+     * configured Rya instance. To perform secondary indexing, use the
+     * configuration variables in {@link ConfigUtils}.
+     * @param   job Job to configure
+     * @throws  AccumuloSecurityException if connecting to Accumulo with the
+     *          given username and password fails
+     */
+    protected void setupRyaOutput(Job job) throws AccumuloSecurityException {
+        job.setOutputFormatClass(RyaOutputFormat.class);
+        job.setOutputValueClass(RyaStatementWritable.class);
+        // Specify default visibility of output rows, if given
+        RyaOutputFormat.setDefaultVisibility(job, 
conf.get(MRUtils.AC_CV_PROP));
+        // Specify named graph, if given
+        RyaOutputFormat.setDefaultContext(job, 
conf.get(MRUtils.NAMED_GRAPH_PROP));
+        // Set the output prefix
+        RyaOutputFormat.setTablePrefix(job, tablePrefix);
+        // Determine which indexers to use based on the config
+        RyaOutputFormat.setFreeTextEnabled(job,  
ConfigUtils.getUseFreeText(conf));
+        RyaOutputFormat.setGeoEnabled(job,  ConfigUtils.getUseGeo(conf));
+        RyaOutputFormat.setTemporalEnabled(job,  
ConfigUtils.getUseTemporal(conf));
+        RyaOutputFormat.setEntityEnabled(job,  ConfigUtils.getUseEntity(conf));
+        // Configure the Accumulo connection
+        AccumuloOutputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+        AccumuloOutputFormat.setCreateTables(job, true);
+        AccumuloOutputFormat.setDefaultTableName(job, tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+        if (mock) {
+            RyaOutputFormat.setMockInstance(job, instance);
+        } else {
+            ClientConfiguration clientConfig = 
ClientConfiguration.loadDefault()
+                    .withInstance(instance).withZkHosts(zk);
+            AccumuloOutputFormat.setZooKeeperInstance(job, clientConfig);
+        }
+    }
+
+    /**
+     * Connects to Accumulo, using the stored connection parameters.
+     * @return  A Connector to an Accumulo instance, which could be a mock
+     *          instance.
+     * @throws AccumuloException if connecting to Accumulo fails.
+     * @throws AccumuloSecurityException if authenticating with Accumulo fails.
+     */
+    protected Connector getConnector() throws AccumuloSecurityException, 
AccumuloException {
+        Instance zooKeeperInstance;
+        if (mock) {
+            zooKeeperInstance = new MockInstance(instance);
+        }
+        else {
+            zooKeeperInstance = new ZooKeeperInstance(instance, zk);
+        }
+        return zooKeeperInstance.getConnector(userName, new 
PasswordToken(pwd));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/mvm/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java
new file mode 100644
index 0000000..90461d1
--- /dev/null
+++ 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/AccumuloHDFSFileInputFormat.java
@@ -0,0 +1,161 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.ArgumentChecker;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * {@link FileInputFormat} that finds the Accumulo tablet files on the HDFS
+ * disk, and uses that as the input for MapReduce jobs.
+ */
+public class AccumuloHDFSFileInputFormat extends FileInputFormat<Key, Value> {
+
+    public static final Range ALLRANGE = new Range(new Text("\u0000"), new 
Text("\uFFFD"));
+
+    @Override
+    public List<InputSplit> getSplits(JobContext jobContext) throws 
IOException {
+        //read the params from AccumuloInputFormat
+        Configuration conf = jobContext.getConfiguration();
+        Instance instance = MRUtils.AccumuloProps.getInstance(jobContext);
+        String user = MRUtils.AccumuloProps.getUsername(jobContext);
+        AuthenticationToken password = 
MRUtils.AccumuloProps.getPassword(jobContext);
+        String table = MRUtils.AccumuloProps.getTablename(jobContext);
+        ArgumentChecker.notNull(instance);
+        ArgumentChecker.notNull(table);
+
+        //find the files necessary
+        try {
+            Connector connector = instance.getConnector(user, password);
+            TableOperations tos = connector.tableOperations();
+            String tableId = tos.tableIdMap().get(table);
+            Scanner scanner = connector.createScanner("accumulo.metadata", 
Authorizations.EMPTY); //TODO: auths?
+            scanner.setRange(new Range(new Text(tableId + "\u0000"), new 
Text(tableId + "\uFFFD")));
+            scanner.fetchColumnFamily(new Text("file"));
+            List<String> files = new ArrayList<String>();
+            List<InputSplit> fileSplits = new ArrayList<InputSplit>();
+            for (Map.Entry<Key, Value> entry : scanner) {
+                String file = entry.getKey().getColumnQualifier().toString();
+                Path path = new Path(file);
+                FileSystem fs = path.getFileSystem(conf);
+                FileStatus fileStatus = fs.getFileStatus(path);
+                long len = fileStatus.getLen();
+                BlockLocation[] fileBlockLocations = 
fs.getFileBlockLocations(fileStatus, 0, len);
+                files.add(file);
+                fileSplits.add(new FileSplit(path, 0, len, 
fileBlockLocations[0].getHosts()));
+            }
+            System.out.println(files);
+            return fileSplits;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    public RecordReader<Key, Value> createRecordReader(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException 
{
+        return new RecordReader<Key, Value>() {
+
+            private FileSKVIterator fileSKVIterator;
+            private boolean started = false;
+
+            @Override
+            public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException {
+                FileSplit split = (FileSplit) inputSplit;
+                Configuration job = taskAttemptContext.getConfiguration();
+                Path file = split.getPath();
+                FileSystem fs = file.getFileSystem(job);
+                Instance instance = 
MRUtils.AccumuloProps.getInstance(taskAttemptContext);
+
+                fileSKVIterator = 
RFileOperations.getInstance().openReader(file.toString(), ALLRANGE,
+                        new HashSet<ByteSequence>(), false, fs, job, 
instance.getConfiguration());
+            }
+
+            @Override
+            public boolean nextKeyValue() throws IOException, 
InterruptedException {
+                if (started) {
+                    fileSKVIterator.next();
+                }
+                else {
+                    started = true; // don't move past the first record yet
+                }
+                return fileSKVIterator.hasTop();
+            }
+
+            @Override
+            public Key getCurrentKey() throws IOException, 
InterruptedException {
+                return fileSKVIterator.getTopKey();
+            }
+
+            @Override
+            public Value getCurrentValue() throws IOException, 
InterruptedException {
+                return fileSKVIterator.getTopValue();
+            }
+
+            @Override
+            public float getProgress() throws IOException, 
InterruptedException {
+                return 0;
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+        };
+    }
+
+    /**
+     * Mapper that has no effect.
+     */
+    @SuppressWarnings("rawtypes")
+    public static class NullMapper extends Mapper {
+        @Override
+        protected void map(Object key, Object value, Context context) throws 
IOException, InterruptedException {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/MRUtils.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/MRUtils.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/MRUtils.java
new file mode 100644
index 0000000..409c978
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/MRUtils.java
@@ -0,0 +1,317 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.rio.RDFFormat;
+
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+
+/**
+ * Contains constants and static methods for interacting with a
+ * {@link Configuration} and handling options likely to be relevant to Rya
+ * MapReduce jobs. Defines constant property names associated with Accumulo and
+ * Rya options, and some convenience methods to get and set these properties
+ * with respect to a given Configuration.
+ */
+public class MRUtils {
+    /**
+     * Property name for the name of a MapReduce job.
+     */
+    public static final String JOB_NAME_PROP = "mapred.job.name";
+
+    /**
+     * Property name for the Accumulo username.
+     */
+    public static final String AC_USERNAME_PROP = "ac.username";
+    /**
+     * Property name for the Accumulo password.
+     */
+    public static final String AC_PWD_PROP = "ac.pwd";
+
+    /**
+     * Property name for the list of zookeepers.
+     */
+    public static final String AC_ZK_PROP = "ac.zk";
+    /**
+     * Property name for the Accumulo instance name.
+     */
+    public static final String AC_INSTANCE_PROP = "ac.instance";
+    /**
+     * Property name for whether to run against a mock Accumulo instance.
+     */
+    public static final String AC_MOCK_PROP = "ac.mock";
+
+    /**
+     * Property name for TTL; allows using an age-off filter on Accumulo input.
+     */
+    public static final String AC_TTL_PROP = "ac.ttl";
+
+    /**
+     * Property name for scan authorizations when reading data from Accumulo.
+     */
+    public static final String AC_AUTH_PROP = "ac.auth";
+    /**
+     * Property name for default visibility when writing data to Accumulo.
+     */
+    public static final String AC_CV_PROP = "ac.cv";
+
+    /**
+     * Property name for whether to read Accumulo data directly from HDFS
+     * as opposed to through Accumulo itself.
+     */
+    public static final String AC_HDFS_INPUT_PROP = "ac.hdfsinput";
+    /**
+     * Property name for the table layout to use when reading data from Rya.
+     */
+    public static final String TABLE_LAYOUT_PROP = "rdf.tablelayout";
+
+    /**
+     * Property name for the Rya table prefix, identifying the Rya
+     * instance to work with.
+     */
+    public static final String TABLE_PREFIX_PROPERTY = "rdf.tablePrefix";
+    /**
+     * Property name for the RDF serialization format to use, when using RDF
+     * files.
+     */
+    public static final String FORMAT_PROP = "rdf.format";
+    /**
+     * Property name for a file input path, if using file input.
+     */
+    public static final String INPUT_PATH = "input";
+    /**
+     * Property name for specifying a default named graph to use when writing
+     * new statements.
+     */
+    public static final String NAMED_GRAPH_PROP = "rdf.graph";
+
+    public static final String AC_TABLE_PROP = "ac.table";
+    public static final String HADOOP_IO_SORT_MB = "io.sort.mb";
+    public static final ValueFactory vf = new ValueFactoryImpl();
+
+    /**
+     * Gets the TTL from a given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @return  The TTL that will be applied as an age-off filter for Accumulo
+     *          input data, or null if not set.
+     */
+    public static String getACTtl(Configuration conf) {
+        return conf.get(AC_TTL_PROP);
+    }
+
+    /**
+     * Gets the username from a given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @return  The configured Accumulo username, or null if not set.
+     */
+    public static String getACUserName(Configuration conf) {
+        return conf.get(AC_USERNAME_PROP);
+    }
+
+    /**
+     * Gets the password from a given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @return  The configured Accumulo password, or null if not set.
+     */
+    public static String getACPwd(Configuration conf) {
+        return conf.get(AC_PWD_PROP);
+    }
+
+    /**
+     * Gets the zookeepers from a given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @return  The configured zookeeper list, or null if not set.
+     */
+    public static String getACZK(Configuration conf) {
+        return conf.get(AC_ZK_PROP);
+    }
+
+    /**
+     * Gets the instance name from a given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @return  The configured Accumulo instance name, or null if not set.
+     */
+    public static String getACInstance(Configuration conf) {
+        return conf.get(AC_INSTANCE_PROP);
+    }
+
+    /**
+     * Gets whether to use a mock instance from a given Configuration.
+     * @param conf          Configuration containing MapReduce tool options.
+     * @param defaultValue  Default choice if the mock property hasn't been
+     *                      explicitly set in the Configuration.
+     * @return  True if a mock instance should be used, false to connect to
+     *          a running Accumulo.
+     */
+    public static boolean getACMock(Configuration conf, boolean defaultValue) {
+        return conf.getBoolean(AC_MOCK_PROP, defaultValue);
+    }
+
+    /**
+     * Gets the table prefix from a given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @return  The configured Rya table prefix, or null if not set.
+     */
+    public static String getTablePrefix(Configuration conf) {
+        return conf.get(TABLE_PREFIX_PROPERTY);
+    }
+
+    /**
+     * Gets the table layout that determines which Rya table to scan for input.
+     * @param   conf            Configuration containing MapReduce tool 
options.
+     * @param   defaultLayout   The layout to use if the Configuration doesn't
+     *                          specify any layout.
+     * @return  The configured layout to use for reading statements from Rya.
+     */
+    public static TABLE_LAYOUT getTableLayout(Configuration conf, TABLE_LAYOUT 
defaultLayout) {
+        return TABLE_LAYOUT.valueOf(conf.get(TABLE_LAYOUT_PROP, 
defaultLayout.toString()));
+    }
+
+    /**
+     * Gets the RDF serialization format to use for parsing RDF files.
+     * @param   conf    Configuration containing MapReduce tool options.
+     * @return  The configured RDFFormat, or null if not set.
+     */
+    public static RDFFormat getRDFFormat(Configuration conf) {
+        return RDFFormat.valueOf(conf.get(FORMAT_PROP));
+    }
+
+    /**
+     * Sets the username in the given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @param str   Accumulo username, used for input and/or output.
+     */
+    public static void setACUserName(Configuration conf, String str) {
+        conf.set(AC_USERNAME_PROP, str);
+    }
+
+    /**
+     * Sets the password in the given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @param str   Accumulo password string, used for input and/or output.
+     */
+    public static void setACPwd(Configuration conf, String str) {
+        conf.set(AC_PWD_PROP, str);
+    }
+
+    /**
+     * Sets the zookeepers in the given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @param str   List of zookeepers to use to connect to Accumulo.
+     */
+    public static void setACZK(Configuration conf, String str) {
+        conf.set(AC_ZK_PROP, str);
+    }
+
+    /**
+     * Sets the instance in the given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @param str   Accumulo instance name, for input and/or output.
+     */
+    public static void setACInstance(Configuration conf, String str) {
+        conf.set(AC_INSTANCE_PROP, str);
+    }
+
+    /**
+     * Sets the TTL in the given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @param str   TTL for Accumulo data. Rows older than this won't be 
scanned
+     *              as input.
+     */
+    public static void setACTtl(Configuration conf, String str) {
+        conf.set(AC_TTL_PROP, str);
+    }
+
+    /**
+     * Sets whether to connect to a mock Accumulo instance.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @param mock  true to use a mock instance, false to attempt to connect
+     *              to a running Accumulo instance.
+     */
+    public static void setACMock(Configuration conf, boolean mock) {
+        conf.setBoolean(AC_MOCK_PROP, mock);
+    }
+
+    /**
+     * Sets the Rya table prefix in the given Configuration.
+     * @param conf      Configuration containing MapReduce tool options.
+     * @param prefix    Prefix of the Rya tables to use for input and/or 
output.
+     */
+    public static void setTablePrefix(Configuration conf, String prefix) {
+        conf.set(TABLE_PREFIX_PROPERTY, prefix);
+    }
+
+    /**
+     * Sets the table layout in the given Configuration.
+     * @param conf  Configuration containing MapReduce tool options.
+     * @param layout    The Rya core table to scan when using Rya for input.
+     */
+    public static void setTableLayout(Configuration conf, TABLE_LAYOUT layout) 
{
+        conf.set(TABLE_LAYOUT_PROP, layout.toString());
+    }
+
+    /**
+     * Sets the RDF serialization format in the given Configuration.
+     * @param conf      Configuration containing MapReduce tool options.
+     * @param format    The expected format of any RDF text data.
+     */
+    public static void setRDFFormat(Configuration conf, RDFFormat format) {
+        conf.set(FORMAT_PROP, format.getName());
+    }
+
+    /**
+     * Static class for accessing properties associated with Accumulo input
+     * formats. Can allow input formats that don't extend
+     * {@link InputFormatBase} to still use the same Accumulo input
+     * configuration options.
+     */
+    @SuppressWarnings("rawtypes")
+    public static class AccumuloProps extends InputFormatBase {
+        /**
+         * @throws UnsupportedOperationException always. This class should 
only be used to access properties.
+         */
+        @Override
+        public RecordReader createRecordReader(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext) {
+            throw new UnsupportedOperationException("Accumulo Props just holds 
properties");
+        }
+        public static Instance getInstance(JobContext  conf) {
+            return InputFormatBase.getInstance(conf);
+        }
+        public static AuthenticationToken getPassword(JobContext  conf) {
+            return InputFormatBase.getAuthenticationToken(conf);
+        }
+        public static String getUsername(JobContext conf) {
+            return InputFormatBase.getPrincipal(conf);
+        }
+        public static String getTablename(JobContext conf) {
+            return InputFormatBase.getInputTableName(conf);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RdfFileInputFormat.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RdfFileInputFormat.java
new file mode 100644
index 0000000..208045e
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RdfFileInputFormat.java
@@ -0,0 +1,443 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.PipedReader;
+import java.io.PipedWriter;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
+import org.apache.log4j.Logger;
+import org.openrdf.model.Statement;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.RDFHandler;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFParseException;
+import org.openrdf.rio.RDFParser;
+import org.openrdf.rio.Rio;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaTripleContext;
+
+/**
+ * {@link FileInputFormat} that can read multiple RDF files and convert into
+ * statements.
+ * <p>
+ * Expects all files to use the same RDF serialization format, which must be
+ * provided.
+ * <p>
+ * Reading and parsing is done asynchronously, so entire files need not be
+ * loaded into memory at once. Reading will block when a character buffer is
+ * full, waiting for the parser to consume more data. The parser will block 
when
+ * a statement buffer is full, waiting for the client to consume the statements
+ * generated so far. This enables large files, particularly useful for 
N-Triples
+ * and N-Quads formats, which can be parsed one line at a time. The size of 
each
+ * buffer can be configured. An error will be thrown if the parser takes too
+ * long to respond, and this timeout can be configured.
+ * <p>
+ * Only N-Triples and N-Quads files may be split into multiple
+ * {@link InputSplit}s per file, if large enough. Input is read line-by-line,
+ * and each line of an N-Triples or N-Quads file is self-contained, so any
+ * arbitrary split is valid. This means the number of input splits may be
+ * greater than the number of input files if and only if N-Triples or N-Quads 
is
+ * given as the RDF serialization format.
+ */
+public class RdfFileInputFormat extends FileInputFormat<LongWritable, 
RyaStatementWritable> {
+    private static final Logger logger = 
Logger.getLogger(RdfFileInputFormat.class);
+    private static final String PREFIX = 
RdfFileInputFormat.class.getSimpleName();
+    private static final String CHAR_BUFFER_SIZE_PROP = PREFIX + 
".char.buffer.size";
+    private static final String STATEMENT_BUFFER_SIZE_PROP = PREFIX + 
".statement.buffer.size";
+    private static final String TIMEOUT_PROP = PREFIX + ".timeout";
+    private static final String FORMAT_PROP = PREFIX + ".rdf.format";
+
+    private static final RDFFormat DEFAULT_RDF_FORMAT = RDFFormat.RDFXML;
+    private static final int DEFAULT_CHAR_BUFFER_SIZE = 1024*1024;
+    private static final int DEFAULT_STATEMENT_BUFFER_SIZE = 1024;
+    private static final int DEFAULT_TIMEOUT = 20;
+
+    static final RyaStatementWritable DONE = new RyaStatementWritable(null, 
null); // signals the end of input
+    static final RyaStatementWritable ERROR = new RyaStatementWritable(null, 
null); // signals some error
+
+    /**
+     * Set the RDF serialization format to parse. All input files must have the
+     * same format.
+     * @param   job     Job to apply the setting to
+     * @param   format  Format of any and all input files
+     */
+    public static void setRDFFormat(Job job, RDFFormat format) {
+        job.getConfiguration().set(FORMAT_PROP, format.getName());
+    }
+
+    /**
+     * Specify the size, in characters, of the input buffer: hold this many
+     * characters in memory before blocking file input.
+     */
+    public static void setCharBufferSize(Job job, int size) {
+        job.getConfiguration().setInt(CHAR_BUFFER_SIZE_PROP, size);
+    }
+
+    /**
+     * Specify the size, in statements, of the parser output buffer: hold this
+     * many Statements in memory before blocking the parser.
+     */
+    public static void setStatementBufferSize(Job job, int size) {
+        job.getConfiguration().setInt(STATEMENT_BUFFER_SIZE_PROP, size);
+    }
+
+    /**
+     * Property to specify the timeout, in seconds:
+     */
+    public static void setTimeout(Job job, int seconds) {
+        job.getConfiguration().setInt(TIMEOUT_PROP, seconds);
+    }
+
+    private RDFFormat getRDFFormat(JobContext context) {
+        String name = context.getConfiguration().get(FORMAT_PROP);
+        return RDFFormat.valueOf(name);
+    }
+
+    /**
+     * Determine whether an input file can be split. If the input format is
+     * configured to be anything other than N-Triples or N-Quads, then the
+     * structure of the file is important and it cannot be split arbitrarily.
+     * Otherwise, default to the superclass logic to determine whether 
splitting
+     * is appropriate.
+     * @return  true if configured to use a line-based input format and the
+     *          superclass implementation returns true.
+     */
+    @Override
+    protected boolean isSplitable(JobContext context, Path filename) {
+        RDFFormat rdfFormat = getRDFFormat(context);
+        if (RDFFormat.NTRIPLES.equals(rdfFormat) || 
RDFFormat.NQUADS.equals(rdfFormat)) {
+            return super.isSplitable(context, filename);
+        }
+        return false;
+    }
+
+    /**
+     * Instantiate a RecordReader for a given task attempt.
+     * @param   inputSplit  Input split to handle, may refer to part or all of
+     *                      an RDF file
+     * @param   taskAttemptContext  Contains configuration options.
+     * @return  A RecordReader that reads and parses RDF text.
+     */
+    @Override
+    public RecordReader<LongWritable, RyaStatementWritable> 
createRecordReader(InputSplit inputSplit,
+            TaskAttemptContext taskAttemptContext) {
+        Configuration conf = taskAttemptContext.getConfiguration();
+        RDFFormat format = getRDFFormat(taskAttemptContext);
+        if (format == null) {
+            format = DEFAULT_RDF_FORMAT;
+        }
+        int charBufferSize = conf.getInt(CHAR_BUFFER_SIZE_PROP, 
DEFAULT_CHAR_BUFFER_SIZE);
+        int statementBufferSize = conf.getInt(STATEMENT_BUFFER_SIZE_PROP, 
DEFAULT_STATEMENT_BUFFER_SIZE);
+        int timeoutSeconds = conf.getInt(TIMEOUT_PROP, DEFAULT_TIMEOUT);
+        return new RdfFileRecordReader(format, charBufferSize, 
statementBufferSize, timeoutSeconds);
+    }
+
+    /**
+     * Reads RDF files and generates RyaStatementWritables. Reads and parses
+     * data in parallel, so the entire file need not be loaded at once.
+     */
+    class RdfFileRecordReader extends RecordReader<LongWritable, 
RyaStatementWritable> implements RDFHandler {
+        private RecordReader<Text, Text> lineReader;
+        private final PipedWriter pipeOut;
+        private final PipedReader pipeIn;
+        private final RDFParser rdfParser;
+        final BlockingQueue<RyaStatementWritable> statementCache;
+
+        private long lineCount = 0;
+        private long statementCount = 0;
+        private RyaTripleContext tripleContext;
+        private RyaStatementWritable nextStatement = null;
+        private int timeoutSeconds;
+        private boolean noMoreStatements = false;
+
+        Thread readerThread;
+        Thread parserThread;
+        private Exception threadException;
+
+        /**
+         * Instantiates the RecordReader.
+         * @param format    RDF serialization format to parse.
+         * @param charBufferSize    Number of input characters to hold in
+         *                          memory; if exceeded, wait until the parser
+         *                          thread consumes some text before proceeding
+         *                          with reading input.
+         * @param statementBufferSize   Number of output statements to hold in
+         *                              memory; if exceeded, wait until the
+         *                              client consumes data before proceeding
+         *                              with parsing.
+         * @param timeoutSeconds    Number of seconds to wait for the parser
+         *                          thread to provide the next statement (or
+         *                          state that there are none). If exceeded,
+         *                          abort.
+         */
+        RdfFileRecordReader(RDFFormat format, int charBufferSize, int 
statementBufferSize, int timeoutSeconds) {
+            rdfParser = Rio.createParser(format);
+            rdfParser.setRDFHandler(this);
+            statementCache = new 
LinkedBlockingQueue<RyaStatementWritable>(statementBufferSize);
+            pipeOut = new PipedWriter();
+            pipeIn = new PipedReader(charBufferSize);
+            this.timeoutSeconds = timeoutSeconds;
+            logger.info("Initializing RecordReader with parameters:");
+            logger.info("\tRDF serialization format = " + format.getName());
+            logger.info("\tinput buffer size = " + charBufferSize + " 
characters");
+            logger.info("\tstatement cache size = " + statementBufferSize);
+            logger.info("\tparser timeout = " + timeoutSeconds + " seconds");
+        }
+
+        /**
+         * Starts up one thread for reading text data (via an internal line
+         * based RecordReader) and one thread for receiving and parsing that
+         * data, each blocking when their respective buffers are full.
+         * @param   inputSplit          The section of data to read
+         * @param   taskAttemptContext  Contains configuration variables
+         * @throws  IOException if an error is encountered initializing the 
line
+         *          RecordReader or piping its output to the parser thread.
+         * @throws  InterruptedException if an error is encountered 
initializing
+         *          the line RecordReader
+         */
+        @Override
+        public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException {
+            Configuration conf = taskAttemptContext.getConfiguration();
+            lineReader = new KeyValueLineRecordReader(conf);
+            lineReader.initialize(inputSplit, taskAttemptContext);
+            tripleContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
+            pipeIn.connect(pipeOut);
+
+            readerThread = new Thread(Thread.currentThread().getName() + " -- 
reader thread") {
+                @Override
+                public void run() {
+                    try {
+                        logger.info("Starting file reader");
+                        while (lineReader.nextKeyValue()) {
+                            Text key = lineReader.getCurrentKey();
+                            Text value = lineReader.getCurrentValue();
+                            pipeOut.write(key.toString());
+                            if (value.getLength() > 0) {
+                                
pipeOut.write(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR);
+                                pipeOut.write(value.toString());
+                            }
+                            pipeOut.write('\n');
+                            lineCount++;
+                        }
+                        logger.info("Reached end of input text; read " + 
lineCount + " lines in total");
+                    } catch (IOException | InterruptedException e) {
+                        logger.error("Error processing line " + (lineCount+1) 
+ " of input", e);
+                        fail(e, this);
+                        throw new RuntimeException(e.getMessage(), e);
+                    }
+                    finally {
+                        try { lineReader.close(); } catch (IOException e) { 
logger.warn(e); }
+                        try { pipeOut.close(); } catch (IOException e) { 
logger.warn(e); }
+                    }
+                }
+            };
+
+            parserThread = new Thread(Thread.currentThread().getName() + " -- 
parser thread") {
+                @Override
+                public void run() {
+                    try {
+                        logger.info("Starting parser");
+                        rdfParser.parse(pipeIn, "");
+                    }
+                    catch (RDFHandlerException | RDFParseException | 
IOException e) {
+                        logger.error(e.getMessage(), e);
+                        fail(e, this);
+                        throw new RuntimeException(e.getMessage(), e);
+                    }
+                    finally {
+                        try { pipeIn.close(); } catch (IOException e) { 
logger.warn(e); }
+                    }
+                }
+            };
+            readerThread.start();
+            parserThread.start();
+        }
+
+        private void fail(Exception e, Thread source) {
+            // Notify the main RecordReader of the error
+            statementCache.offer(ERROR);
+            threadException = e;
+            // Kill the reader thread if necessary
+            if (source != readerThread && readerThread.isAlive()) {
+                readerThread.interrupt();
+            }
+            // Kill the parser thread if necessary
+            if (source != parserThread && parserThread.isAlive()) {
+                parserThread.interrupt();
+            }
+        }
+
+        /**
+         * Loads the next statement, if there is one, and returns whether there
+         * is one. Receives statements from the parser thread via a blocking
+         * queue.
+         * @throws  InterruptedException if interrupted while waiting for a
+         *          statement to show up in the queue.
+         * @throws  IOException if the parser thread doesn't respond after the
+         *          configured timeout, or if any thread reports an error.
+         * @return  true if a valid statement was loaded, or false if there are
+         *          no more statements in this input split.
+         */
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException 
{
+            if (noMoreStatements) {
+                return false;
+            }
+            nextStatement = statementCache.poll(timeoutSeconds, 
TimeUnit.SECONDS);
+            if (nextStatement == null) {
+                    throw new IOException("Parser neither sending results nor 
signaling end of data after "
+                        + timeoutSeconds + " seconds.");
+            }
+            else if (nextStatement == DONE) {
+                logger.info("Reached end of parsed RDF; read " +  
statementCount + " statements in total.");
+                nextStatement = null;
+                noMoreStatements = true;
+                return false;
+            }
+            else if (nextStatement == ERROR) {
+                nextStatement = null;
+                noMoreStatements = true;
+                throw new IOException("Error detected processing input.", 
threadException);
+            }
+            statementCount++;
+            return true;
+        }
+
+        /**
+         * Gets the current key.
+         * @return  the number of statements read so far, or null if all input
+         *          has been read.
+         */
+        @Override
+        public LongWritable getCurrentKey() {
+            if (noMoreStatements) {
+                return null;
+            }
+            return new LongWritable(statementCount);
+        }
+
+        /**
+         * Gets the current value.
+         * @return  a RyaStatementWritable loaded from RDF data, or null if all
+         *          input has been read.
+         */
+        @Override
+        public RyaStatementWritable getCurrentValue() {
+            return nextStatement;
+        }
+
+        /**
+         * Gets the progress of the underlying line-based Record Reader. Does
+         * not include any information about the progress of the parser.
+         * @return  The proportion of text input that has been read.
+         * @throws  IOException if thrown by the internal RecordReader.
+         * @throws  InterruptedException if thrown by the internal 
RecordReader.
+         */
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return lineReader.getProgress();
+        }
+
+        /**
+         * Closes all the underlying resources.
+         */
+        @Override
+        public void close() {
+            if (parserThread.isAlive()) {
+                parserThread.interrupt();
+            }
+            if (readerThread.isAlive()) {
+                readerThread.interrupt();
+            }
+            try { lineReader.close(); } catch (IOException e) { 
logger.warn(e); }
+            try { pipeOut.close(); } catch (IOException e) { logger.warn(e); }
+            try { pipeIn.close(); } catch (IOException e) { logger.warn(e); }
+        }
+
+        /**
+         * Has no effect.
+         */
+        @Override
+        public void startRDF() throws RDFHandlerException {
+        }
+
+        /**
+         * Add a dummy item to the queue to signal that there will be no more
+         * statements.
+         * @throws  RDFHandlerException     if interrupted while waiting for
+         *          the blocking queue to be ready to accept the done signal.
+         */
+        @Override
+        public void endRDF() throws RDFHandlerException {
+            logger.info("Finished parsing RDF");
+            try {
+                statementCache.put(DONE);
+            } catch (InterruptedException e) {
+                throw new RDFHandlerException("Interrupted while waiting to 
add done signal to statement queue", e);
+            }
+        }
+
+        /**
+         * Has no effect.
+         */
+        @Override
+        public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
+        }
+
+        /**
+         * Convert the {@link Statement} to a {@link RyaStatement}, wrap it in 
a
+         * {@link RyaStatementWritable}, and add it to the queue.
+         * @throws  RDFHandlerException     if interrupted while waiting for 
the
+         *          blocking queue to be ready to accept statement data.
+         */
+        @Override
+        public void handleStatement(Statement statement) throws 
RDFHandlerException {
+            try {
+                statementCache.put(new 
RyaStatementWritable(RdfToRyaConversions.convertStatement(statement), 
tripleContext));
+            } catch (InterruptedException e) {
+                throw new RDFHandlerException("Interrupted while waiting to 
add parsed statement to the statement queue", e);
+            }
+        }
+
+        /**
+         * Has no effect.
+         */
+        @Override
+        public void handleComment(String s) {
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaInputFormat.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaInputFormat.java
new file mode 100644
index 0000000..b6545b6
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaInputFormat.java
@@ -0,0 +1,130 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
+import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+/**
+ * Subclass of {@link AbstractInputFormat} for reading
+ * {@link RyaStatementWritable}s directly from a running Rya instance.
+ */
+public class RyaInputFormat extends AbstractInputFormat<Text, 
RyaStatementWritable> {
+    /**
+     * Instantiates a RecordReader for this InputFormat and a given task and
+     * input split.
+     * @param   split   Defines the portion of the input this RecordReader is
+     *                  responsible for.
+     * @param   context     The context of the task.
+     * @return A RecordReader that can be used to fetch RyaStatementWritables.
+     */
+    @Override
+    public RecordReader<Text, RyaStatementWritable> 
createRecordReader(InputSplit split, TaskAttemptContext context) {
+        return new RyaStatementRecordReader();
+    }
+
+    /**
+     * Sets the table layout to use.
+     * @param conf  Configuration to set the layout in.
+     * @param layout    Statements will be read from the Rya table associated
+     *                  with this layout.
+     */
+    public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
+        conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name());
+    }
+
+    /**
+     * Retrieves RyaStatementWritable objects from Accumulo tables.
+     */
+    public class RyaStatementRecordReader extends AbstractRecordReader<Text, 
RyaStatementWritable> {
+        private RyaTripleContext ryaContext;
+        private TABLE_LAYOUT tableLayout;
+
+        @Override
+        protected void setupIterators(TaskAttemptContext context, Scanner 
scanner, String tableName,
+                RangeInputSplit split) {
+        }
+
+        /**
+         * Initializes the RecordReader.
+         * @param   inSplit Defines the portion of data to read.
+         * @param   attempt Context for this task attempt.
+         * @throws IOException if thrown by the superclass's initialize method.
+         */
+        @Override
+        public void initialize(InputSplit inSplit, TaskAttemptContext attempt) 
throws IOException {
+            super.initialize(inSplit, attempt);
+            this.tableLayout = 
MRUtils.getTableLayout(attempt.getConfiguration(), TABLE_LAYOUT.OSP);
+            //TODO verify that this is correct
+            this.ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(attempt.getConfiguration()));
+        }
+
+        /**
+         * Load the next statement by converting the next Accumulo row to a
+         * statement, and make the new (key,value) pair available for 
retrieval.
+         * @return true if another (key,value) pair was fetched and is ready to
+         *          be retrieved, false if there was none.
+         * @throws  IOException if a row was loaded but could not be converted
+         *          to a statement.
+         */
+        @Override
+        public boolean nextKeyValue() throws IOException {
+            if (!scannerIterator.hasNext())
+                return false;
+            Entry<Key, Value> entry = scannerIterator.next();
+            ++numKeysRead;
+            currentKey = entry.getKey();
+            try {
+                currentK = currentKey.getRow();
+                RyaStatement stmt = 
this.ryaContext.deserializeTriple(this.tableLayout,
+                        new TripleRow(entry.getKey().getRow().getBytes(),
+                                entry.getKey().getColumnFamily().getBytes(),
+                                entry.getKey().getColumnQualifier().getBytes(),
+                                entry.getKey().getTimestamp(),
+                                
entry.getKey().getColumnVisibility().getBytes(),
+                                entry.getValue().get()));
+                RyaStatementWritable writable = new RyaStatementWritable();
+                writable.setRyaStatement(stmt);
+                currentV = writable;
+            } catch(TripleRowResolverException e) {
+                throw new IOException(e);
+            }
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java
new file mode 100644
index 0000000..c9f7ffe
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java
@@ -0,0 +1,597 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MultiTableBatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.log4j.Logger;
+import org.openrdf.model.Statement;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.indexing.FreeTextIndexer;
+import mvm.rya.indexing.GeoIndexer;
+import mvm.rya.indexing.TemporalIndexer;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+
+/**
+ * {@link OutputFormat} that uses Rya, the {@link GeoIndexer}, the
+ * {@link FreeTextIndexer}, the {@link TemporalIndexer}, and the
+ * {@link EntityCentricIndex} as the sink of triple data. This
+ * OutputFormat ignores the Keys and only writes the Values to Rya.
+ * <p>
+ * The user must specify connection parameters for Rya, {@link GeoIndexer},
+ * {@link FreeTextIndexer}, {@link TemporalIndexer}, and
+ * {@link EntityCentricIndex}, if secondary indexing is desired.
+ */
+public class RyaOutputFormat extends OutputFormat<Writable, 
RyaStatementWritable> {
+    private static final Logger logger = 
Logger.getLogger(RyaOutputFormat.class);
+
+    private static final String PREFIX = RyaOutputFormat.class.getSimpleName();
+    private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + 
".maxmemory";
+    private static final String ENABLE_FREETEXT = PREFIX + ".freetext.enable";
+    private static final String ENABLE_GEO = PREFIX + ".geo.enable";
+    private static final String ENABLE_TEMPORAL = PREFIX + ".temporal.enable";
+    private static final String ENABLE_ENTITY = PREFIX + ".entity.enable";
+    private static final String ENABLE_CORE = PREFIX + ".coretables.enable";
+    private static final String OUTPUT_PREFIX_PROPERTY = PREFIX + 
".tablePrefix";
+    private static final String CV_PROPERTY = PREFIX + ".cv.default";
+    private static final String CONTEXT_PROPERTY = PREFIX + ".context";
+
+    /**
+     * Set the default visibility of output: any statement whose visibility is
+     * null will be written with this visibility instead. If not specified, use
+     * an empty authorizations list.
+     * @param job Job to apply the setting to.
+     * @param visibility A comma-separated list of authorizations.
+     */
+    public static void setDefaultVisibility(Job job, String visibility) {
+        if (visibility != null) {
+            job.getConfiguration().set(CV_PROPERTY, visibility);
+        }
+    }
+
+    /**
+     * Set the default context (named graph) for any output: any statement 
whose
+     * context is null will be written with this context instead. If not
+     * specified, don't write any context.
+     * @param job Job to apply the setting to.
+     * @param context A context string, should be a syntactically valid URI.
+     */
+    public static void setDefaultContext(Job job, String context) {
+        if (context != null) {
+            job.getConfiguration().set(CONTEXT_PROPERTY, context);
+        }
+    }
+
+    /**
+     * Set the table prefix for output.
+     * @param job Job to apply the setting to.
+     * @param prefix The common prefix to all rya tables that output will be 
written to.
+     */
+    public static void setTablePrefix(Job job, String prefix) {
+        job.getConfiguration().set(OUTPUT_PREFIX_PROPERTY, prefix);
+    }
+
+    /**
+     * Set whether the free text index is enabled. Defaults to true.
+     * @param job Job to apply the setting to.
+     * @param enable Whether this job should add its output statements to the 
free text index.
+     */
+    public static void setFreeTextEnabled(Job job, boolean enable) {
+        job.getConfiguration().setBoolean(ENABLE_FREETEXT, enable);
+    }
+
+    /**
+     * Set whether the geo index is enabled. Defaults to true.
+     * @param job Job to apply the setting to.
+     * @param enable Whether this job should add its output statements to the 
geo index.
+     */
+    public static void setGeoEnabled(Job job, boolean enable) {
+        job.getConfiguration().setBoolean(ENABLE_GEO, enable);
+    }
+
+    /**
+     * Set whether the temporal index is enabled. Defaults to true.
+     * @param job Job to apply the setting to.
+     * @param enable Whether this job should add its output statements to the 
temporal index.
+     */
+    public static void setTemporalEnabled(Job job, boolean enable) {
+        job.getConfiguration().setBoolean(ENABLE_TEMPORAL, enable);
+    }
+
+    /**
+     * Set whether the entity-centric index is enabled. Defaults to true.
+     * @param job Job to apply the setting to.
+     * @param enable Whether this job should add its output statements to the 
entity-centric index.
+     */
+    public static void setEntityEnabled(Job job, boolean enable) {
+        job.getConfiguration().setBoolean(ENABLE_ENTITY, enable);
+    }
+
+    /**
+     * Set whether to insert to the core Rya tables (spo, osp, po). Defaults 
to true.
+     * @param job Job to apply the setting to.
+     * @param enable Whether this job should output to the core tables.
+     */
+    public static void setCoreTablesEnabled(Job job, boolean enable) {
+        job.getConfiguration().setBoolean(ENABLE_CORE, enable);
+    }
+
+    /**
+     * Configure a job to use a mock Accumulo instance.
+     * @param job Job to configure
+     * @param instance Name of the mock instance
+     */
+    public static void setMockInstance(Job job, String instance) {
+        AccumuloOutputFormat.setMockInstance(job, instance);
+        job.getConfiguration().setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true);
+        job.getConfiguration().setBoolean(MRUtils.AC_MOCK_PROP, true);
+    }
+
+    /**
+     * Verify that all of the enabled indexers can be initialized.
+     * @param   jobContext  Context containing configuration
+     * @throws  IOException if initializing the core Rya indexer fails.
+     */
+    @Override
+    public void checkOutputSpecs(JobContext jobContext) throws IOException {
+        Configuration conf = jobContext.getConfiguration();
+        // make sure that all of the indexers can connect
+        getGeoIndexer(conf);
+        getFreeTextIndexer(conf);
+        getTemporalIndexer(conf);
+        getRyaIndexer(conf);
+    }
+
+    /**
+     * Get the OutputCommitter for this OutputFormat.
+     * @param   context Context of the MapReduce task
+     * @return  A committer whose method implementations are empty.
+     */
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
throws IOException, InterruptedException {
+        // copied from AccumuloOutputFormat
+        return new NullOutputFormat<Text, 
Mutation>().getOutputCommitter(context);
+    }
+
+    /**
+     * Get this OutputFormat's RecordWriter.
+     * @param   context     Context of the MapReduce task
+     * @return  A RecordWriter that writes statements to Rya tables.
+     * @throws  IOException if any enabled indexers can't be initialized
+     */
+    @Override
+    public RecordWriter<Writable, RyaStatementWritable> 
getRecordWriter(TaskAttemptContext context) throws IOException {
+        return new RyaRecordWriter(context);
+    }
+
+    private static GeoIndexer getGeoIndexer(Configuration conf) {
+        if (!conf.getBoolean(ENABLE_GEO, true)) {
+            return null;
+        }
+        GeoMesaGeoIndexer geo = new GeoMesaGeoIndexer();
+        geo.setConf(conf);
+        return geo;
+    }
+
+    private static FreeTextIndexer getFreeTextIndexer(Configuration conf) {
+        if (!conf.getBoolean(ENABLE_FREETEXT, true)) {
+            return null;
+        }
+        AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer();
+        freeText.setConf(conf);
+        return freeText;
+    }
+
+    private static TemporalIndexer getTemporalIndexer(Configuration conf) {
+        if (!conf.getBoolean(ENABLE_TEMPORAL, true)) {
+            return null;
+        }
+        AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
+        temporal.setConf(conf);
+        return temporal;
+    }
+
+    private static EntityCentricIndex getEntityIndexer(Configuration conf) {
+        if (!conf.getBoolean(ENABLE_ENTITY, true)) {
+            return null;
+        }
+        EntityCentricIndex entity = new EntityCentricIndex();
+        entity.setConf(conf);
+        return entity;
+    }
+
+    private static AccumuloRyaDAO getRyaIndexer(Configuration conf) throws 
IOException {
+        try {
+            if (!conf.getBoolean(ENABLE_CORE, true)) {
+                return null;
+            }
+            AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO();
+            Connector conn = ConfigUtils.getConnector(conf);
+            ryaIndexer.setConnector(conn);
+
+            AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+
+            String tablePrefix = conf.get(OUTPUT_PREFIX_PROPERTY, null);
+            if (tablePrefix != null) {
+                ryaConf.setTablePrefix(tablePrefix);
+            }
+            ryaConf.setDisplayQueryPlan(false);
+            ryaIndexer.setConf(ryaConf);
+            ryaIndexer.init();
+            return ryaIndexer;
+        } catch (AccumuloException e) {
+            logger.error("Cannot create RyaIndexer", e);
+            throw new IOException(e);
+        } catch (AccumuloSecurityException e) {
+            logger.error("Cannot create RyaIndexer", e);
+            throw new IOException(e);
+        } catch (RyaDAOException e) {
+            logger.error("Cannot create RyaIndexer", e);
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * RecordWriter that takes in {@link RyaStatementWritable}s and writes them
+     * to Rya tables.
+     */
+    public static class RyaRecordWriter extends RecordWriter<Writable, 
RyaStatementWritable>
+            implements Closeable, Flushable {
+        private static final Logger logger = 
Logger.getLogger(RyaRecordWriter.class);
+
+        private FreeTextIndexer freeTextIndexer;
+        private GeoIndexer geoIndexer;
+        private TemporalIndexer temporalIndexer;
+        private EntityCentricIndex entityIndexer;
+        private AccumuloRyaDAO ryaIndexer;
+        private RyaTripleContext tripleContext;
+        private MultiTableBatchWriter writer;
+        private byte[] cv = AccumuloRdfConstants.EMPTY_CV.getExpression();
+        private RyaURI defaultContext = null;
+
+        private static final long ONE_MEGABYTE = 1024L * 1024L;
+        private static final long AVE_STATEMENT_SIZE = 100L;
+
+        private long bufferSizeLimit;
+        private long bufferCurrentSize = 0;
+
+        private ArrayList<RyaStatement> buffer;
+
+        /**
+         * Constructor.
+         * @param context Context for MapReduce task
+         * @throws  IOException if the core Rya indexer or entity indexer can't
+         *          be initialized
+         */
+        public RyaRecordWriter(TaskAttemptContext context) throws IOException {
+            this(context.getConfiguration());
+        }
+
+        /**
+         * Constructor.
+         * @param conf Configuration containing any relevant options.
+         * @throws  IOException if the core Rya indexer or entity indexer can't
+         *          be initialized
+         */
+        public RyaRecordWriter(Configuration conf) throws IOException {
+            // set the visibility
+            String visibility = conf.get(CV_PROPERTY);
+            if (visibility != null) {
+                cv = visibility.getBytes();
+            }
+            // set the default context
+            String context = conf.get(CONTEXT_PROPERTY, "");
+            if (context != null && !context.isEmpty()) {
+                defaultContext = new RyaURI(context);
+            }
+
+            // set up the buffer
+            bufferSizeLimit = conf.getLong(MAX_MUTATION_BUFFER_SIZE, 
ONE_MEGABYTE);
+            int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE);
+            buffer = new ArrayList<RyaStatement>(bufferCapacity);
+
+            // set up the indexers
+            freeTextIndexer = getFreeTextIndexer(conf);
+            geoIndexer = getGeoIndexer(conf);
+            temporalIndexer = getTemporalIndexer(conf);
+            entityIndexer = getEntityIndexer(conf);
+            ryaIndexer = getRyaIndexer(conf);
+
+            // The entity index needs a batch writer -- typically it uses the 
DAO's, but decoupling
+            // them lets it be used with or without the core tables, like the 
other indexers.
+            if (entityIndexer != null) {
+                Connector conn;
+                try {
+                    conn = ConfigUtils.getConnector(conf);
+                } catch (AccumuloException | AccumuloSecurityException e) {
+                    throw new IOException("Error connecting to Accumulo for 
entity index output", e);
+                }
+                BatchWriterConfig batchWriterConfig = new BatchWriterConfig();
+                
batchWriterConfig.setMaxMemory(RdfCloudTripleStoreConstants.MAX_MEMORY);
+                
batchWriterConfig.setTimeout(RdfCloudTripleStoreConstants.MAX_TIME, 
TimeUnit.MILLISECONDS);
+                
batchWriterConfig.setMaxWriteThreads(RdfCloudTripleStoreConstants.NUM_THREADS);
+                writer = conn.createMultiTableBatchWriter(batchWriterConfig);
+                entityIndexer.setMultiTableBatchWriter(writer);
+            }
+
+            // update fields used for metrics
+            startTime = System.currentTimeMillis();
+            lastCommitFinishTime = startTime;
+
+            // set up the triple context
+            tripleContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
+        }
+
+        /**
+         * Write any buffered statements to Accumulo.
+         * @throws IOException if any indexer can't be flushed.
+         */
+        @Override
+        public void flush() throws IOException {
+            flushBuffer();
+        }
+
+        /**
+         * Close all indexers.
+         */
+        @Override
+        public void close() {
+            close(null);
+        }
+
+        /**
+         * Close all indexers.
+         * @param   paramTaskAttemptContext     Unused.
+         */
+        @Override
+        public void close(TaskAttemptContext paramTaskAttemptContext) {
+            // close everything. log errors
+            try {
+                flush();
+            } catch (IOException e) {
+                logger.error("Error flushing the buffer on RyaOutputFormat 
Close", e);
+            }
+            try {
+                if (geoIndexer != null)
+                    geoIndexer.close();
+            } catch (IOException e) {
+                logger.error("Error closing the geoIndexer on RyaOutputFormat 
Close", e);
+            }
+            try {
+                if (freeTextIndexer != null)
+                    freeTextIndexer.close();
+            } catch (IOException e) {
+                logger.error("Error closing the freetextIndexer on 
RyaOutputFormat Close", e);
+            }
+            try {
+                if (temporalIndexer != null)
+                    temporalIndexer.close();
+            } catch (IOException e) {
+                logger.error("Error closing the temporalIndexer on 
RyaOutputFormat Close", e);
+            }
+            try {
+                if (entityIndexer != null)
+                    entityIndexer.close();
+            } catch (IOException e) {
+                logger.error("Error closing the entityIndexer on 
RyaOutputFormat Close", e);
+            }
+            try {
+                if (ryaIndexer != null)
+                    ryaIndexer.destroy();
+            } catch (RyaDAOException e) {
+                logger.error("Error closing RyaDAO on RyaOutputFormat Close", 
e);
+            }
+            if (writer != null) {
+                try {
+                    writer.close();
+                } catch (MutationsRejectedException e) {
+                    logger.error("Error closing MultiTableBatchWriter on 
RyaOutputFormat Close", e);
+                }
+            }
+        }
+
+        /**
+         * Write a {@link Statement} to Rya. Adds the statement to a buffer, 
and
+         * flushes the statement buffer to the database if full.
+         * @param   statement   Statement to insert to Rya.
+         * @throws  IOException if writing to Accumulo fails.
+         */
+        public void write(Statement statement) throws IOException {
+            write(RdfToRyaConversions.convertStatement(statement));
+        }
+
+        /**
+         * Writes a RyaStatement to Rya. Adds the statement to a buffer, and
+         * flushes the statement buffer to the database if full.
+         * @param   ryaStatement   Statement to insert to Rya.
+         * @throws  IOException if writing to Accumulo fails.
+         */
+        public void write(RyaStatement ryaStatement) throws IOException {
+            write(NullWritable.get(), new RyaStatementWritable(ryaStatement, 
tripleContext));
+        }
+
+        /**
+         * Writes a (key,value) pair to Rya. Adds the statement to a buffer, 
and
+         * flushes the statement buffer to the database if full.
+         * @param   key     Arbitrary Writable, not used.
+         * @param   value   Contains statement to insert to Rya.
+         * @throws  IOException if writing to Accumulo fails.
+         */
+        @Override
+        public void write(Writable key, RyaStatementWritable value) throws 
IOException {
+            RyaStatement ryaStatement = value.getRyaStatement();
+            if (ryaStatement.getColumnVisibility() == null) {
+                ryaStatement.setColumnVisibility(cv);
+            }
+            if (ryaStatement.getContext() == null) {
+                ryaStatement.setContext(defaultContext);
+            }
+            buffer.add(ryaStatement);
+            bufferCurrentSize += statementSize(ryaStatement);
+            if (bufferCurrentSize >= bufferSizeLimit) {
+                flushBuffer();
+            }
+        }
+
+        private int statementSize(RyaStatement ryaStatement) {
+            RyaURI subject = ryaStatement.getSubject();
+            RyaURI predicate = ryaStatement.getPredicate();
+            RyaType object = ryaStatement.getObject();
+            RyaURI context = ryaStatement.getContext();
+            int size = 3 + subject.getData().length() + 
predicate.getData().length() + object.getData().length();
+            if (!XMLSchema.ANYURI.equals(object.getDataType())) {
+                size += 2 + object.getDataType().toString().length();
+            }
+            if (context != null) {
+                size += context.getData().length();
+            }
+            return size;
+        }
+
+        // fields for storing metrics
+        private long startTime = 0;
+        private long lastCommitFinishTime = 0;
+        private long totalCommitRecords = 0;
+
+        private double totalReadDuration = 0;
+        private double totalWriteDuration = 0;
+
+        private long commitCount = 0;
+
+        private void flushBuffer() throws IOException {
+            totalCommitRecords += buffer.size();
+            commitCount++;
+
+            long startCommitTime = System.currentTimeMillis();
+
+            logger.info(String.format("(C-%d) Flushing buffer with %,d objects 
and %,d bytes", commitCount, buffer.size(),
+                    bufferCurrentSize));
+
+            double readingDuration = (startCommitTime - lastCommitFinishTime) 
/ 1000.;
+            totalReadDuration += readingDuration;
+            double currentReadRate = buffer.size() / readingDuration;
+            double totalReadRate = totalCommitRecords / totalReadDuration;
+
+            // Print "reading" metrics
+            logger.info(String.format("(C-%d) (Reading) Duration, Current 
Rate, Total Rate: %.2f %.2f %.2f ", commitCount, readingDuration,
+                    currentReadRate, totalReadRate));
+
+            // write to geo
+            if (geoIndexer != null) {
+                geoIndexer.storeStatements(buffer);
+                geoIndexer.flush();
+            }
+
+            // write to free text
+            if (freeTextIndexer != null) {
+                freeTextIndexer.storeStatements(buffer);
+                freeTextIndexer.flush();
+            }
+
+            // write to temporal
+            if (temporalIndexer != null) {
+                temporalIndexer.storeStatements(buffer);
+                temporalIndexer.flush();
+            }
+
+            // write to entity
+            if (entityIndexer != null && writer != null) {
+                entityIndexer.storeStatements(buffer);
+                try {
+                    writer.flush();
+                } catch (MutationsRejectedException e) {
+                    throw new IOException("Error flushing data to Accumulo for 
entity indexing", e);
+                }
+            }
+
+            // write to rya
+            try {
+                if (ryaIndexer != null) {
+                    ryaIndexer.add(buffer.iterator());
+                }
+            } catch (RyaDAOException e) {
+                logger.error("Cannot write statement to Rya", e);
+                throw new IOException(e);
+            }
+
+            lastCommitFinishTime = System.currentTimeMillis();
+
+            double writingDuration = (lastCommitFinishTime - startCommitTime) 
/ 1000.;
+            totalWriteDuration += writingDuration;
+            double currentWriteRate = buffer.size() / writingDuration;
+            double totalWriteRate = totalCommitRecords / totalWriteDuration;
+
+            // Print "writing" stats
+            logger.info(String.format("(C-%d) (Writing) Duration, Current 
Rate, Total Rate: %.2f %.2f %.2f ", commitCount, writingDuration,
+                    currentWriteRate, totalWriteRate));
+
+            double processDuration = writingDuration + readingDuration;
+            double totalProcessDuration = totalWriteDuration + 
totalReadDuration;
+            double currentProcessRate = buffer.size() / processDuration;
+            double totalProcessRate = totalCommitRecords / 
(totalProcessDuration);
+
+            // Print "total" stats
+            logger.info(String.format("(C-%d) (Total) Duration, Current Rate, 
Total Rate: %.2f %.2f %.2f ", commitCount, processDuration,
+                    currentProcessRate, totalProcessRate));
+
+            // clear the buffer
+            buffer.clear();
+            bufferCurrentSize = 0L;
+        }
+    }
+}

Reply via email to