http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java
new file mode 100644
index 0000000..ca4d0c2
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java
@@ -0,0 +1,260 @@
+package mvm.rya.accumulo.mr.merge.mappers;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import 
org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.Mapper;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.merge.CopyTool;
+import mvm.rya.accumulo.mr.merge.MergeTool;
+import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaTripleContext;
+import twitter4j.Logger;
+
+/**
+ * The base {@link Mapper} for the copy tool which initializes the mapper for 
use.  The mapper will take all
+ * keys from the parent table that are after the provided start time and copy 
them to the child table.
+ */
+public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends 
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+    private static final Logger log = 
Logger.getLogger(BaseCopyToolMapper.class);
+
+    protected String startTimeString;
+    protected Date startTime;
+    protected Date runTime;
+    protected Long timeOffset;
+    protected boolean useCopyFileOutput;
+
+    protected String parentTableName;
+    protected String childTableName;
+    protected String parentTablePrefix;
+    protected String childTablePrefix;
+    protected Text childTableNameText;
+
+    protected Configuration parentConfig;
+    protected Configuration childConfig;
+
+    protected String parentUser;
+    protected String childUser;
+
+    protected Connector parentConnector;
+    protected Connector childConnector;
+
+    protected AccumuloRdfConfiguration parentAccumuloRdfConfiguration;
+    protected AccumuloRdfConfiguration childAccumuloRdfConfiguration;
+
+    protected RyaTripleContext childRyaContext;
+
+    protected AccumuloRyaDAO childDao;
+
+    /**
+     * Creates a new {@link BaseCopyToolMapper}.
+     */
+    public BaseCopyToolMapper() {
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+
+        log.info("Setting up mapper");
+
+        parentConfig = context.getConfiguration();
+        childConfig = MergeToolMapper.getChildConfig(parentConfig);
+
+        startTimeString = parentConfig.get(MergeTool.START_TIME_PROP, null);
+        if (startTimeString != null) {
+            startTime = 
MergeTool.convertStartTimeStringToDate(startTimeString);
+        }
+
+        String runTimeString = parentConfig.get(CopyTool.COPY_RUN_TIME_PROP, 
null);
+        if (runTimeString != null) {
+            runTime = MergeTool.convertStartTimeStringToDate(runTimeString);
+        }
+
+        String offsetString = 
parentConfig.get(CopyTool.PARENT_TIME_OFFSET_PROP, null);
+        if (offsetString != null) {
+            timeOffset = Long.valueOf(offsetString);
+        }
+
+        useCopyFileOutput = 
parentConfig.getBoolean(CopyTool.USE_COPY_FILE_OUTPUT, false);
+
+        parentTableName = parentConfig.get(MergeTool.TABLE_NAME_PROP, null);
+        parentTablePrefix = parentConfig.get(MRUtils.TABLE_PREFIX_PROPERTY, 
null);
+        childTablePrefix = childConfig.get(MRUtils.TABLE_PREFIX_PROPERTY, 
null);
+        childTableName = parentTableName.replaceFirst(parentTablePrefix, 
childTablePrefix);
+        childTableNameText = new Text(childTableName);
+        log.info("Copying data from parent table, \"" + parentTableName + "\", 
to child table, \"" + childTableName + "\"");
+
+        parentUser = parentConfig.get(MRUtils.AC_USERNAME_PROP, null);
+        childUser = childConfig.get(MRUtils.AC_USERNAME_PROP, null);
+
+        parentAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(parentConfig);
+        parentAccumuloRdfConfiguration.setTablePrefix(parentTablePrefix);
+        parentConnector = 
AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration);
+
+        childAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(childConfig);
+        childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix);
+        childRyaContext = 
RyaTripleContext.getInstance(childAccumuloRdfConfiguration);
+
+
+        if (useCopyFileOutput) {
+            fixSplitsInCachedLocalFiles();
+        } else {
+            childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
+            childDao = AccumuloRyaUtils.setupDao(childConnector, 
childAccumuloRdfConfiguration);
+
+            createTableIfNeeded();
+
+            copyAuthorizations();
+        }
+
+        // Add the run time and split time to the table
+        addMetadataKeys(context);
+
+        log.info("Finished setting up mapper");
+    }
+
+    /**
+     * Fixes the "splits.txt" file path in the 
"mapreduce.job.cache.local.files" property.  It contains the
+     * {@link URI} "file:" prefix which causes {@link KeyRangePartitioner} to 
throw a {@code FileNotFoundException}
+     * when it attempts to open it.
+     */
+    private void fixSplitsInCachedLocalFiles() {
+        if (useCopyFileOutput) {
+            // The "mapreduce.job.cache.local.files" property contains a 
comma-separated
+            // list of cached local file paths.
+            String cachedLocalFiles = 
parentConfig.get(MRJobConfig.CACHE_LOCALFILES);
+            if (cachedLocalFiles != null) {
+                List<String> cachedLocalFilesList = 
Lists.newArrayList(Splitter.on(',').split(cachedLocalFiles));
+                List<String> formattedCachedLocalFilesList = new ArrayList<>();
+                for (String cachedLocalFile : cachedLocalFilesList) {
+                    String pathToAdd = cachedLocalFile;
+                    if (cachedLocalFile.endsWith("splits.txt")) {
+                        URI uri = null;
+                        try {
+                            uri = new URI(cachedLocalFiles);
+                            pathToAdd = uri.getPath();
+                        } catch (URISyntaxException e) {
+                            log.error("Invalid syntax in local cache file 
path", e);
+                        }
+                    }
+                    formattedCachedLocalFilesList.add(pathToAdd);
+                }
+                String formattedCachedLocalFiles = 
Joiner.on(',').join(formattedCachedLocalFilesList);
+                if (!cachedLocalFiles.equals(formattedCachedLocalFiles)) {
+                    parentConfig.set(MRJobConfig.CACHE_LOCALFILES, 
formattedCachedLocalFiles);
+                }
+            }
+        }
+    }
+
+    protected void addMetadataKeys(Context context) throws IOException {
+        try {
+            if (AccumuloRyaUtils.getCopyToolRunDate(childDao) == null) {
+                log.info("Writing copy tool run time metadata to child table: 
" + runTime);
+                AccumuloRyaUtils.setCopyToolRunDate(runTime, childDao);
+            }
+            if (AccumuloRyaUtils.getCopyToolSplitDate(childDao) == null) {
+                log.info("Writing copy split time metadata to child table: " + 
startTime);
+                AccumuloRyaUtils.setCopyToolSplitDate(startTime, childDao);
+            }
+
+            if (timeOffset != null) {
+                log.info("Writing copy tool time offset metadata to child 
table: " + timeOffset);
+                AccumuloRyaUtils.setTimeOffset(timeOffset, childDao);
+            }
+        } catch (RyaDAOException e) {
+            throw new IOException("Failed to set time metadata key for table: 
" + childTableName, e);
+        }
+    }
+
+    private void createTableIfNeeded() throws IOException {
+        try {
+            if (!childConnector.tableOperations().exists(childTableName)) {
+                log.info("Creating table: " + childTableName);
+                childConnector.tableOperations().create(childTableName);
+                log.info("Created table: " + childTableName);
+                log.info("Granting authorizations to table: " + 
childTableName);
+                
childConnector.securityOperations().grantTablePermission(childUser, 
childTableName, TablePermission.WRITE);
+                log.info("Granted authorizations to table: " + childTableName);
+            }
+        } catch (TableExistsException | AccumuloException | 
AccumuloSecurityException e) {
+            throw new IOException(e);
+        }
+    }
+
+    protected void copyAuthorizations() throws IOException {
+        try {
+            SecurityOperations parentSecOps = 
parentConnector.securityOperations();
+            SecurityOperations childSecOps = 
childConnector.securityOperations();
+
+            Authorizations parentAuths = 
parentSecOps.getUserAuthorizations(parentUser);
+            Authorizations childAuths = 
childSecOps.getUserAuthorizations(childUser);
+            // Add any parent authorizations that the child doesn't have.
+            if (!childAuths.equals(parentAuths)) {
+                log.info("Adding the authorization, \"" + 
parentAuths.toString() + "\", to the child user, \"" + childUser + "\"");
+                Authorizations newChildAuths = 
AccumuloRyaUtils.addUserAuths(childUser, childSecOps, parentAuths);
+                childSecOps.changeUserAuthorizations(childUser, newChildAuths);
+            }
+        } catch (AccumuloException | AccumuloSecurityException e) {
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+        super.cleanup(context);
+        log.info("Cleaning up mapper...");
+        try {
+            if (childDao != null) {
+                childDao.destroy();
+            }
+        } catch (RyaDAOException e) {
+            log.error("Error destroying child DAO", e);
+        }
+        log.info("Cleaned up mapper");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java
new file mode 100644
index 0000000..a0abe0d
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java
@@ -0,0 +1,222 @@
+package mvm.rya.accumulo.mr.merge.mappers;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Or;
+import org.openrdf.query.algebra.ValueExpr;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.mr.merge.MergeTool;
+import mvm.rya.accumulo.mr.merge.util.AccumuloQueryRuleset;
+import mvm.rya.accumulo.mr.merge.util.CopyRule;
+import mvm.rya.accumulo.mr.merge.util.QueryRuleset.QueryRulesetException;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaToRdfConversions;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolver;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+import mvm.rya.rdftriplestore.evaluation.ParallelEvaluationStrategyImpl;
+
+/**
+ * Take in rows from a table and range defined by query-based rules, convert 
the rows to
+ * statements based on the table name, and process those statements that match 
the rule(s).
+ */
+public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends 
BaseCopyToolMapper<Key, Value, KEYOUT, VALUEOUT> {
+    /**
+     * Hadoop counters for tracking the number of statements and/or raw rows 
that have been processed.
+     */
+    public static enum Counters { STATEMENTS_COPIED, DIRECT_ROWS_COPIED };
+
+    private static final Logger log = Logger.getLogger(BaseRuleMapper.class);
+
+    private TripleRowResolver resolver = new WholeRowTripleResolver();
+    private TABLE_LAYOUT parentLayout = null;
+    private ValueExpr condition;
+    private ParallelEvaluationStrategyImpl strategy;
+    private RangeInputSplit split;
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        split = (RangeInputSplit) context.getInputSplit();
+        Range range = split.getRange();
+
+        // Determine the table and table layout we're scanning
+        parentTableName = split.getTableName();
+        parentTablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY);
+        for (TABLE_LAYOUT layout : TABLE_LAYOUT.values()) {
+            String tableName = 
RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, parentTablePrefix);
+            if (tableName.equals(parentTableName)) {
+                parentLayout = layout;
+            }
+        }
+        conf.set(MergeTool.TABLE_NAME_PROP, parentTableName);
+
+        // Set up connections and parent/child table information, if necessary
+        super.setup(context);
+
+        // If we're working at the statement level, get the relevant rules and 
conditions:
+        if (parentLayout != null) {
+            AccumuloQueryRuleset ruleset;
+            try {
+                ruleset = new AccumuloQueryRuleset(new 
AccumuloRdfConfiguration(conf));
+            } catch (QueryRulesetException e) {
+                throw new IOException("Error parsing the input query", e);
+            }
+
+            List<CopyRule> rules = ruleset.getRules(parentLayout, range);
+
+            for (CopyRule rule : rules) {
+                log.info("Mapper applies to rule:");
+                for (String line : rule.toString().split("\n")) {
+                    log.info("\t" + line);
+                }
+            }
+
+            // Combine the rules' conditions so that if any of the individual 
conditions matches, the
+            // composite condition will match as well. We know all the rules 
match all the statements
+            // this input split will receive, so if any condition is true 
we'll want to copy the statement.
+            for (CopyRule rule : rules) {
+                // Attach any relevant filter conditions given by this rule.
+                // If there are no conditions, all statements read by this 
mapper should be accepted
+                // (even if there are redundant rules with conditions)
+                if (rule.getCondition() == null) {
+                    condition = null;
+                    break;
+                }
+                // If there is a set of conditions, matching it means we 
should accept the statement.
+                else if (condition == null) {
+                    condition = rule.getCondition();
+                }
+                // If there are more than one rules that match, satisfying any 
conditions means we should accept.
+                else {
+                    condition = new Or(condition, rule.getCondition());
+                }
+            }
+
+            // Set up the strategy to evaluate those conditions
+            strategy = new ParallelEvaluationStrategyImpl(null, null, null, 
childAccumuloRdfConfiguration);
+
+            // Log info about the split and combined condition
+            log.info("Table: " + parentTableName);
+            log.info("Range:");
+            log.info("\tfrom " + keyToString(range.getStartKey(), 
Integer.MAX_VALUE));
+            log.info("\tto " + keyToString(range.getEndKey(), 
Integer.MAX_VALUE));
+            if (condition == null) {
+                log.info("Condition: none");
+            }
+            else {
+                log.info("Condition:");
+                for (String line : condition.toString().split("\n")) {
+                    log.info("\t" + line);
+                }
+            }
+        }
+
+        else {
+            log.info("(Copying all rows from " + parentTableName + " 
directly.)");
+        }
+    }
+
+    @Override
+    protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+        TripleRow row = new TripleRow(key.getRowData().toArray(), 
key.getColumnFamilyData().toArray(),
+                key.getColumnQualifierData().toArray(), key.getTimestamp(),
+                key.getColumnVisibilityData().toArray(), value == null ? null 
: value.get());
+        try {
+            // If there's no layout, copy the row directly
+            if (parentLayout == null) {
+                copyRow(key, value, context);
+                context.getCounter(Counters.DIRECT_ROWS_COPIED).increment(1);
+            }
+            // If there is a layout, deserialize the statement and insert it 
if it meets the condition
+            else {
+                RyaStatement rs = resolver.deserialize(parentLayout, row);
+                if (condition == null || 
CopyRule.accept(RyaToRdfConversions.convertStatement(rs), condition, strategy)) 
{
+                    copyStatement(rs, context);
+                    
context.getCounter(Counters.STATEMENTS_COPIED).increment(1);
+                }
+            }
+        } catch (TripleRowResolverException e) {
+            throw new IOException("Error deserializing triple", e);
+        } catch (QueryEvaluationException e) {
+            throw new IOException("Error evaluating the filter condition", e);
+        }
+    }
+
+    /**
+     * Copy a Statement, serializing it and/or indexing it as necessary.
+     * @param rstmt RyaStatement to copy to the child
+     * @param context Context to use for writing
+     * @throws InterruptedException If the Hadoop framework reports an 
interrupt
+     * @throws IOException If any error is encountered while serializing or 
writing the statement
+     */
+    abstract protected void copyStatement(RyaStatement rstmt, Context context) 
throws IOException, InterruptedException;
+
+    /**
+     * Copy a row directly, as opposed to starting with a higher-level object 
and serializing it.
+     * @param key Row's key
+     * @param value Row's value
+     * @param context Context to use for writing
+     * @throws InterruptedException If the Hadoop framework reports an 
interrupt
+     * @throws IOException If an error is encountered writing the row
+     */
+    abstract protected void copyRow(Key key, Value value, Context context) 
throws IOException, InterruptedException;
+
+    /**
+     * Get a printable representation of a Key, with parts truncated to a 
parameterized length.
+     * (Key.toString() truncates to a fixed length that is sometimes too short 
to usefully log ranges).
+     * @param key Any Accumulo Key
+     * @param max The maximum printed length of each individual portion
+     * @return A human-readable representation of the Key
+     */
+    private static String keyToString(Key key, int max) {
+        StringBuilder sb = new StringBuilder();
+        byte[] row = key.getRow().copyBytes();
+        byte[] colFamily = key.getColumnFamily().copyBytes();
+        byte[] colQualifier = key.getColumnQualifier().copyBytes();
+        byte[] colVisibility = key.getColumnVisibility().copyBytes();
+        Key.appendPrintableString(row, 0, row.length, max, sb);
+        sb.append(" ");
+        Key.appendPrintableString(colFamily, 0, colFamily.length, max, sb);
+        sb.append(":");
+        Key.appendPrintableString(colQualifier, 0, colQualifier.length, max, 
sb);
+        sb.append(" [");
+        Key.appendPrintableString(colVisibility, 0, colVisibility.length, max, 
sb);
+        sb.append("] ");
+        sb.append(Long.toString(key.getTimestamp()));
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java
new file mode 100644
index 0000000..7a306f2
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java
@@ -0,0 +1,81 @@
+package mvm.rya.accumulo.mr.merge.mappers;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.log4j.Logger;
+
+import mvm.rya.accumulo.AccumuloRdfUtils;
+import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+/**
+ * Extended {@link BaseCopyToolMapper} that handles the {@code 
AccumuloFileOutputFormat} for the copy tool.
+ */
+public class FileCopyToolMapper extends BaseCopyToolMapper<Key, Value, Key, 
Value> {
+    private static final Logger log = 
Logger.getLogger(FileCopyToolMapper.class);
+
+    /**
+     * Creates a new {@link FileCopyToolMapper}.
+     */
+    public FileCopyToolMapper() {
+    }
+
+    @Override
+    protected void addMetadataKeys(Context context) throws IOException {
+        try {
+            if (runTime != null) {
+                log.info("Writing copy tool run time metadata to child table: 
" + runTime);
+                RyaStatement ryaStatement = 
AccumuloRyaUtils.createCopyToolRunTimeRyaStatement(runTime);
+                writeRyaStatement(ryaStatement, context);
+            }
+
+            if (startTime != null) {
+                log.info("Writing copy split time metadata to child table: " + 
startTime);
+                RyaStatement ryaStatement = 
AccumuloRyaUtils.createCopyToolSplitTimeRyaStatement(startTime);
+                writeRyaStatement(ryaStatement, context);
+            }
+
+            if (timeOffset != null) {
+                log.info("Writing copy tool time offset metadata to child 
table: " + timeOffset);
+                RyaStatement ryaStatement = 
AccumuloRyaUtils.createTimeOffsetRyaStatement(timeOffset);
+                writeRyaStatement(ryaStatement, context);
+            }
+        } catch (TripleRowResolverException | IOException | 
InterruptedException e) {
+            throw new IOException("Failed to write metadata key", e);
+        }
+    }
+
+    private void writeRyaStatement(RyaStatement ryaStatement, Context context) 
throws TripleRowResolverException, IOException, InterruptedException {
+        Map<TABLE_LAYOUT, TripleRow> serialize = 
childRyaContext.getTripleResolver().serialize(ryaStatement);
+        TripleRow tripleRow = serialize.get(TABLE_LAYOUT.SPO);
+        Key key = AccumuloRdfUtils.from(tripleRow);
+        Value value = AccumuloRdfUtils.extractValue(tripleRow);
+        context.write(key, value);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/MergeToolMapper.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/MergeToolMapper.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/MergeToolMapper.java
new file mode 100644
index 0000000..0b5dddd
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/MergeToolMapper.java
@@ -0,0 +1,575 @@
+package mvm.rya.accumulo.mr.merge.mappers;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.data.ColumnUpdate;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Charsets;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.RyaTableMutationsFactory;
+import mvm.rya.accumulo.mr.merge.CopyTool;
+import mvm.rya.accumulo.mr.merge.MergeTool;
+import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
+import mvm.rya.accumulo.mr.merge.util.TimeUtils;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+/**
+ * Reads from the Parent and Child tables comparing their the keys and adds or 
deletes the keys
+ * from the parent as necessary in order to reflect changes that the child 
made since the provided
+ * start time.
+ */
+public class MergeToolMapper extends Mapper<Key, Value, Text, Mutation> {
+    private static final Logger log = Logger.getLogger(MergeToolMapper.class);
+
+    private boolean usesStartTime;
+    private String startTimeString;
+    private Date startTime;
+    private String parentTableName;
+    private String childTableName;
+    private String parentTablePrefix;
+    private String childTablePrefix;
+    private Text spoTable;
+    private Text poTable;
+    private Text ospTable;
+
+    private Context context;
+
+    private Configuration parentConfig;
+    private Configuration childConfig;
+
+    private AccumuloRdfConfiguration parentAccumuloRdfConfiguration;
+    private AccumuloRdfConfiguration childAccumuloRdfConfiguration;
+
+    private RyaTripleContext parentRyaContext;
+    private RyaTripleContext childRyaContext;
+
+    private RyaTableMutationsFactory ryaTableMutationFactory;
+
+    private Scanner childScanner;
+    private Iterator<Entry<Key, Value>> childIterator;
+    private Connector childConnector;
+    private AccumuloRyaDAO childDao;
+
+    private Date copyToolInputTime;
+    private Date copyToolRunTime;
+
+    private Long parentTimeOffset = 0L;
+    private Long childTimeOffset = 0L;
+
+    private boolean useTimeSync = false;
+    private boolean useMergeFileInput = false;
+
+    /**
+     * Creates a new {@link MergeToolMapper}.
+     */
+    public MergeToolMapper() {
+    }
+
+    /**
+     * The result of comparing a child key and parent key which determines 
what should be done with them.
+     */
+    private static enum CompareKeysResult {
+        /**
+         * Indicates that the child iterator should move to the next key in 
the child
+         * table in order to be compared to the current key in the parent 
table.
+         */
+        ADVANCE_CHILD,
+        /**
+         * Indicates that the child iterator should move to the next key in 
the child
+         * table in order to be compared to the current key in the parent table
+         * and that the current child key should be added to the parent.
+         */
+        ADVANCE_CHILD_AND_ADD,
+        /**
+         * Indicates that the parent iterator should move to the next key in 
the parent table
+         * in order to be compared to the current key in the child table.
+         */
+        ADVANCE_PARENT,
+        /**
+         * Indicates that the parent iterator should move to the next key in 
the parent table
+         * in order to be compared to the current key in the child table
+         * and that the current parent key should be deleted from the parent.
+         */
+        ADVANCE_PARENT_AND_DELETE,
+        /**
+         * Indicates that the child iterator should move to the next key in 
the child table
+         * and the parent iterator should move to the next key in the parent 
table.
+         */
+        ADVANCE_BOTH,
+        /**
+         * Indicates that there are no more keys to compare in the child and 
parent tables.
+         */
+        FINISHED;
+    }
+
+    /**
+     * Expert users can override this method for more complete control over
+     * the execution of the Mapper.
+     *
+     * @param context
+     * @throws IOException
+     */
+    @Override
+    public void run(Context context) throws IOException, InterruptedException {
+        setup(context);
+        this.context = context;
+
+        try {
+            RyaStatement parentRyaStatement = nextParentRyaStatement();
+            RyaStatement childRyaStatement = nextChildRyaStatement();
+
+            CompareKeysResult compareKeysResult = null;
+            // Iteratively compare parent keys to child keys until finished
+            while (compareKeysResult != CompareKeysResult.FINISHED) {
+                compareKeysResult = compareKeys(parentRyaStatement, 
childRyaStatement);
+
+                // Based on how the keys compare add or delete keys and 
advance the child or parent iterators forward
+                switch (compareKeysResult) {
+                    case ADVANCE_CHILD:
+                        childRyaStatement = nextChildRyaStatement();
+                        break;
+                    case ADVANCE_PARENT:
+                        parentRyaStatement = nextParentRyaStatement();
+                        break;
+                    case ADVANCE_CHILD_AND_ADD:
+                        RyaStatement tempChildRyaStatement = childRyaStatement;
+                        childRyaStatement = nextChildRyaStatement();
+                        addKey(tempChildRyaStatement, context);
+                        break;
+                    case ADVANCE_PARENT_AND_DELETE:
+                        RyaStatement tempParentRyaStatement = 
parentRyaStatement;
+                        parentRyaStatement = nextParentRyaStatement();
+                        deleteKey(tempParentRyaStatement, context);
+                        break;
+                    case ADVANCE_BOTH:
+                        ColumnVisibility cv1 = new 
ColumnVisibility(parentRyaStatement.getColumnVisibility());
+                        ColumnVisibility cv2 = new 
ColumnVisibility(childRyaStatement.getColumnVisibility());
+
+                        // Update new column visibility now if necessary
+                        if (!cv1.equals(cv2) && 
!cv2.equals(AccumuloRdfConstants.EMPTY_CV)) {
+                            ColumnVisibility newCv = 
combineColumnVisibilities(cv1, cv2);
+                            RyaStatement newCvRyaStatement = 
updateRyaStatementColumnVisibility(parentRyaStatement, newCv);
+
+                            deleteKey(parentRyaStatement, context);
+                            addKey(newCvRyaStatement, context);
+                        }
+
+                        parentRyaStatement = nextParentRyaStatement();
+                        childRyaStatement = nextChildRyaStatement();
+                        break;
+                    case FINISHED:
+                        log.info("Finished scanning parent and child tables");
+                        break;
+                    default:
+                        log.error("Unknown result: " + compareKeysResult);
+                        break;
+                }
+            }
+        } catch (MutationsRejectedException | TripleRowResolverException e) {
+            log.error("Error encountered while merging", e);
+        } finally {
+            cleanup(context);
+        }
+    }
+
+    private RyaStatement nextParentRyaStatement() throws IOException, 
InterruptedException {
+        return nextRyaStatement(context, parentRyaContext);
+    }
+
+    private RyaStatement nextChildRyaStatement() throws IOException, 
InterruptedException {
+        return nextRyaStatement(childIterator, childRyaContext);
+    }
+
+    private static RyaStatement nextRyaStatement(Iterator<Entry<Key, Value>> 
iterator, RyaTripleContext ryaContext) {
+        RyaStatement ryaStatement = null;
+        if (iterator.hasNext()) {
+            Entry<Key, Value> entry = iterator.next();
+            Key key = entry.getKey();
+            Value value = entry.getValue();
+            try {
+                ryaStatement = createRyaStatement(key, value, ryaContext);
+            } catch (TripleRowResolverException e) {
+                log.error("TripleRowResolverException encountered while 
creating statement", e);
+            }
+        }
+        return ryaStatement;
+    }
+
+    private static RyaStatement nextRyaStatement(Context context, 
RyaTripleContext ryaContext) throws IOException, InterruptedException {
+        RyaStatement ryaStatement = null;
+        if (context.nextKeyValue()) {
+            Key key = context.getCurrentKey();
+            Value value = context.getCurrentValue();
+            try {
+                ryaStatement = createRyaStatement(key, value, ryaContext);
+            } catch (TripleRowResolverException e) {
+                log.error("TripleRowResolverException encountered while 
creating statement", e);
+            }
+        }
+        return ryaStatement;
+    }
+
+    private static RyaStatement createRyaStatement(Key key, Value value, 
RyaTripleContext ryaTripleContext) throws TripleRowResolverException {
+        byte[] row = key.getRowData() != null  && 
key.getRowData().toArray().length > 0 ? key.getRowData().toArray() : null;
+        byte[] columnFamily = key.getColumnFamilyData() != null  && 
key.getColumnFamilyData().toArray().length > 0 ? 
key.getColumnFamilyData().toArray() : null;
+        byte[] columnQualifier = key.getColumnQualifierData() != null  && 
key.getColumnQualifierData().toArray().length > 0 ? 
key.getColumnQualifierData().toArray() : null;
+        Long timestamp = key.getTimestamp();
+        byte[] columnVisibility = key.getColumnVisibilityData() != null && 
key.getColumnVisibilityData().toArray().length > 0 ? 
key.getColumnVisibilityData().toArray() : null;
+        byte[] valueBytes = value != null && value.get().length > 0 ? 
value.get() : null;
+        TripleRow tripleRow = new TripleRow(row, columnFamily, 
columnQualifier, timestamp, columnVisibility, valueBytes);
+        RyaStatement ryaStatement = 
ryaTripleContext.deserializeTriple(TABLE_LAYOUT.SPO, tripleRow);
+
+        return ryaStatement;
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+
+        log.info("Setting up mapper");
+
+        parentConfig = context.getConfiguration();
+        childConfig = getChildConfig(parentConfig);
+
+        startTimeString = parentConfig.get(MergeTool.START_TIME_PROP, null);
+        if (startTimeString != null) {
+            startTime = 
MergeTool.convertStartTimeStringToDate(startTimeString);
+        }
+        usesStartTime = startTime != null;
+        useTimeSync = parentConfig.getBoolean(CopyTool.USE_NTP_SERVER_PROP, 
false);
+        useMergeFileInput = 
parentConfig.getBoolean(MergeTool.USE_MERGE_FILE_INPUT, false);
+        parentTableName = parentConfig.get(MergeTool.TABLE_NAME_PROP, null);
+        parentTablePrefix = parentConfig.get(MRUtils.TABLE_PREFIX_PROPERTY, 
null);
+        childTablePrefix = childConfig.get(MRUtils.TABLE_PREFIX_PROPERTY, 
null);
+        if (useMergeFileInput) {
+            childTableName = parentTableName.replaceFirst(parentTablePrefix, 
childTablePrefix) + MergeTool.TEMP_SUFFIX;
+        } else {
+            childTableName = parentTableName.replaceFirst(parentTablePrefix, 
childTablePrefix);
+        }
+        spoTable = new Text(parentTablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+        poTable = new Text(parentTablePrefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
+        ospTable = new Text(parentTablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
+
+        childScanner = setupChildScanner(context);
+        childIterator = childScanner.iterator();
+
+        parentAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(parentConfig);
+        parentAccumuloRdfConfiguration.setTablePrefix(parentTablePrefix);
+        parentRyaContext = 
RyaTripleContext.getInstance(parentAccumuloRdfConfiguration);
+
+        ryaTableMutationFactory = new 
RyaTableMutationsFactory(parentRyaContext);
+
+        childAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(childConfig);
+        childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix);
+        childRyaContext = 
RyaTripleContext.getInstance(childAccumuloRdfConfiguration);
+        childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
+
+        childDao = AccumuloRyaUtils.setupDao(childConnector, 
childAccumuloRdfConfiguration);
+
+        if (startTime != null && useTimeSync) {
+            try {
+                copyToolInputTime = 
AccumuloRyaUtils.getCopyToolSplitDate(childDao);
+                copyToolRunTime = 
AccumuloRyaUtils.getCopyToolRunDate(childDao);
+
+                // Find the parent's time offset that was stored when the 
child was copied.
+                parentTimeOffset = AccumuloRyaUtils.getTimeOffset(childDao);
+                String durationBreakdown = 
TimeUtils.getDurationBreakdown(parentTimeOffset);
+                log.info("The table " + parentTableName + " has a time offset 
of: " + durationBreakdown);
+                childTimeOffset = 
Long.valueOf(childConfig.get(CopyTool.CHILD_TIME_OFFSET_PROP, null));
+                Date adjustedParentStartTime = new Date(startTime.getTime() - 
parentTimeOffset);
+                Date adjustedChildStartTime = new Date(startTime.getTime() - 
childTimeOffset);
+                log.info("Adjusted parent start time: " + 
adjustedParentStartTime);
+                log.info("Adjusted child start time: " + 
adjustedChildStartTime);
+            } catch (RyaDAOException e) {
+                log.error("Error getting time offset", e);
+            }
+        }
+
+        log.info("Finished setting up mapper");
+    }
+
+    /**
+     * Takes the child instance values in the configuration and puts into 
their corresponding parent instance values
+     * so the config will connect to the child instance.
+     * @param parentConfig the {@link Configuration} containing the parent and 
child properties.
+     * @return the new {@link Configuration} where the parent connection 
values are replaced with
+     * the child connection values.
+     */
+    public static Configuration getChildConfig(Configuration parentConfig) {
+        Configuration childConfig = new Configuration(parentConfig);
+
+        // Switch the temp child properties to be the main ones
+        convertChildPropToParentProp(childConfig, parentConfig, 
MRUtils.AC_MOCK_PROP);
+        convertChildPropToParentProp(childConfig, parentConfig, 
MRUtils.AC_INSTANCE_PROP);
+        convertChildPropToParentProp(childConfig, parentConfig, 
MRUtils.AC_USERNAME_PROP);
+        convertChildPropToParentProp(childConfig, parentConfig, 
MRUtils.AC_PWD_PROP);
+        convertChildPropToParentProp(childConfig, parentConfig, 
MRUtils.TABLE_PREFIX_PROPERTY);
+        convertChildPropToParentProp(childConfig, parentConfig, 
MRUtils.AC_AUTH_PROP);
+        convertChildPropToParentProp(childConfig, parentConfig, 
RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH);
+        convertChildPropToParentProp(childConfig, parentConfig, 
MRUtils.AC_ZK_PROP);
+
+        MergeTool.setDuplicateKeys(childConfig);
+
+        return childConfig;
+    }
+
+    /**
+     * Looks for all properties in the parent/main configuration marked as a 
child value (by being appended with ".child")
+     * and converts it in an unmarked property for the child config to use.
+     * @param childConfig the child {@link Configuration}.
+     * @param parentConfig the parent/main {@link Configuration}.
+     * @param parentPropertyName the parent property name.
+     */
+    public static void convertChildPropToParentProp(Configuration childConfig, 
Configuration parentConfig, String parentPropertyName) {
+        String childValue = parentConfig.get(parentPropertyName + 
MergeTool.CHILD_SUFFIX, "");
+        childConfig.set(parentPropertyName, childValue);
+    }
+
+    /**
+     * Combines 2 {@link ColumnVisibility ColumnVisibilities} by OR'ing them 
together.
+     * @param cv1 the first (parent) {@link ColumnVisibility}.
+     * @param cv2 the second (child) {@link ColumnVisibility}.
+     * @return the newly combined {@link ColumnVisibility}.
+     */
+    public static ColumnVisibility combineColumnVisibilities(ColumnVisibility 
cv1, ColumnVisibility cv2) {
+        // OR the 2 column visibilities together if they're different
+        String columnVisibilityExpression;
+        if (cv1.equals(AccumuloRdfConstants.EMPTY_CV)) {
+            columnVisibilityExpression = new String(cv2.getExpression(), 
Charsets.UTF_8);
+        } else {
+            columnVisibilityExpression = "(" + new String(cv1.getExpression(), 
Charsets.UTF_8) + ")|("
+                    + new String(cv2.getExpression(), Charsets.UTF_8) + ")";
+        }
+        ColumnVisibility newCv = new ColumnVisibility(new 
Text(columnVisibilityExpression));
+        newCv = new ColumnVisibility(newCv.flatten());
+        return newCv;
+    }
+
+    private Scanner setupChildScanner(Context context) throws IOException {
+        return setupScanner(context, childTableName, childConfig);
+    }
+
+    private static Scanner setupScanner(Context context, String tableName, 
Configuration config) throws IOException {
+        RangeInputSplit split = (RangeInputSplit) context.getInputSplit();
+        Range splitRange = split.getRange();
+        Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config);
+        scanner.setRange(splitRange);
+
+        return scanner;
+    }
+
+    private void writeRyaMutations(RyaStatement ryaStatement, Context context, 
boolean isDelete) throws IOException, InterruptedException {
+        if (ryaStatement.getColumnVisibility() == null) {
+            
ryaStatement.setColumnVisibility(AccumuloRdfConstants.EMPTY_CV.getExpression());
+        }
+
+        Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = 
ryaTableMutationFactory.serialize(ryaStatement);
+        Collection<Mutation> spoMutations = mutationMap.get(TABLE_LAYOUT.SPO);
+        Collection<Mutation> poMutations = mutationMap.get(TABLE_LAYOUT.PO);
+        Collection<Mutation> ospMutations = mutationMap.get(TABLE_LAYOUT.OSP);
+
+        for (Mutation mutation : spoMutations) {
+            writeMutation(spoTable, mutation, context, isDelete);
+        }
+        for (Mutation mutation : poMutations) {
+            writeMutation(poTable, mutation, context, isDelete);
+        }
+        for (Mutation mutation : ospMutations) {
+            writeMutation(ospTable, mutation, context, isDelete);
+        }
+    }
+
+    private void addKey(RyaStatement ryaStatement, Context context) throws 
IOException, InterruptedException {
+        writeRyaMutations(ryaStatement, context, false);
+    }
+
+    private void deleteKey(RyaStatement ryaStatement, Context context) throws 
IOException, InterruptedException {
+        writeRyaMutations(ryaStatement, context, true);
+    }
+
+    /**
+     * Writes a mutation to the specified table.  If the mutation is meant to 
delete then the mutation will
+     * be transformed to a delete mutation.
+     * @param table the table to write to.
+     * @param mutation the {@link mutation}.
+     * @param context the {@link Context}.
+     * @param isDelete {@code true} if the mutation should be a delete 
mutation.  {@code false} otherwise.
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    private static void writeMutation(Text table, Mutation mutation, Context 
context, boolean isDelete) throws IOException, InterruptedException {
+        if (isDelete) {
+            List<ColumnUpdate> updates = mutation.getUpdates();
+            ColumnUpdate columnUpdate = updates.get(0);
+            ColumnVisibility cv = columnUpdate.getColumnVisibility() != null ? 
new ColumnVisibility(columnUpdate.getColumnVisibility()) : null;
+            Mutation deleteMutation = new Mutation(new 
Text(mutation.getRow()));
+            deleteMutation.putDelete(columnUpdate.getColumnFamily(), 
columnUpdate.getColumnQualifier(), cv, columnUpdate.getTimestamp());
+            context.write(table, deleteMutation);
+        } else {
+            context.write(table, mutation);
+        }
+    }
+
+    /**
+     * Adjusts the date of a key's timestamp to account for the instance's 
machine local time offset.
+     * @param date the timestamp {@link Date} to adjust.
+     * @param isParentTable {@code true} if the timestamp is from a key in one 
of the parent instance's tables.
+     * {@code false} if it's from the child instance.
+     * @return the normalized {@link Date} or the same date if nothing needed 
to be adjusted.
+     */
+    private Date normalizeDate(Date date, boolean isParentTable) {
+        Date normalizedDate = date;
+        if (useTimeSync) {
+            if (isParentTable) {
+                normalizedDate = new Date(date.getTime() - parentTimeOffset);
+            } else {
+                // If the timestamp is before the time the child table was 
copied from
+                // the parent then the timestamp originated from the parent 
machine
+                if (TimeUtils.dateBeforeInclusive(date, copyToolRunTime)) {
+                    normalizedDate = new Date(date.getTime() - 
parentTimeOffset);
+                } else {
+                    // Timestamps after the copy time originated from the 
child machine.
+                    normalizedDate = new Date(date.getTime() - 
childTimeOffset);
+                }
+            }
+        }
+        return normalizedDate;
+    }
+
+    /**
+     * Since both Scanners will return sorted data, if the two key-values are
+     * equal, then both Scanners can advance to the next comparison. If the Key
+     * from Scanner1 sorts before the Key from Scanner2, then that Key doesn't
+     * exist in the table from Scanner2 which means Scanner1 should advance. If
+     * the Key from Scanner2 sorts before the Key from Scanner1, then that Key
+     * doesn't exist in the table from Scanner1 which means Scanner2 should
+     * advance.
+     * @param key1 the {@link RyaStatement} from the parent instance table.
+     * @param key2 the {@link RyaStatement} from the child instance table.
+     * @return the {@link CompareKeysResult}.
+     * @throws MutationsRejectedException
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws TripleRowResolverException
+     */
+    private CompareKeysResult compareKeys(RyaStatement key1, RyaStatement 
key2) throws MutationsRejectedException, IOException, InterruptedException, 
TripleRowResolverException {
+        log.trace("key1 = " + key1);
+        log.trace("key2 = " + key2);
+        if (key1 == null && key2 == null) {
+            // Reached the end of the parent and child table.
+            return CompareKeysResult.FINISHED;
+        } else if (key1 == null) {
+            // Reached the end of the parent table so add the remaining child 
keys if they meet the time criteria.
+            Date t2 = normalizeDate(new Date(key2.getTimestamp()), false);
+            // Move on to next comparison (do nothing) or add this child key 
to parent
+            boolean doNothing = usesStartTime && t2.before(startTime);
+            return doNothing ? CompareKeysResult.ADVANCE_CHILD : 
CompareKeysResult.ADVANCE_CHILD_AND_ADD;
+        } else if (key2 == null) {
+            // Reached the end of the child table so delete the remaining 
parent keys if they meet the time criteria.
+            Date t1 = normalizeDate(new Date(key1.getTimestamp()), true);
+            // Move on to next comparison (do nothing) or delete this key from 
parent
+            boolean doNothing = usesStartTime && (copyToolInputTime != null && 
(t1.before(copyToolInputTime) || (t1.after(copyToolInputTime) && 
t1.after(startTime))) || (copyToolInputTime == null && t1.after(startTime)));
+            return doNothing ? CompareKeysResult.ADVANCE_PARENT : 
CompareKeysResult.ADVANCE_PARENT_AND_DELETE;
+        } else {
+            // There are 2 keys to compare
+            Map<TABLE_LAYOUT, TripleRow> map1 = 
parentRyaContext.serializeTriple(key1);
+            Text row1 = new Text(map1.get(TABLE_LAYOUT.SPO).getRow());
+            Map<TABLE_LAYOUT, TripleRow> map2 = 
childRyaContext.serializeTriple(key2);
+            Text row2 = new Text(map2.get(TABLE_LAYOUT.SPO).getRow());
+            Date t1 = normalizeDate(new Date(key1.getTimestamp()), true);
+            Date t2 = normalizeDate(new Date(key2.getTimestamp()), false);
+
+            if (row1.compareTo(row2) < 0) {
+                // Parent key sort order was before the child key sort order
+                // so it doesn't exist in the child table.
+                // What does this mean?  Was it added by the parent after the 
child was cloned? (Meaning we should leave it)
+                // Or did the child delete it after it was cloned? (Meaning we 
should delete it)
+                boolean doNothing = usesStartTime && (copyToolInputTime != 
null && (t1.before(copyToolInputTime) || (t1.after(copyToolInputTime) && 
t1.after(startTime))) || (copyToolInputTime == null && t1.after(startTime)));
+                return doNothing ? CompareKeysResult.ADVANCE_PARENT : 
CompareKeysResult.ADVANCE_PARENT_AND_DELETE;
+            } else if (row1.compareTo(row2) > 0) {
+                // Parent key sort order was after the child key sort order
+                // so it doesn't exist in the parent table.
+                // What does this mean?  Was it deleted by the parent after 
the child was cloned? (Meaning we should leave it)
+                // Or did the child add it after it was cloned? (Meaning we 
should add it)
+                boolean doNothing = usesStartTime && t2.before(startTime);
+                return doNothing ? CompareKeysResult.ADVANCE_CHILD : 
CompareKeysResult.ADVANCE_CHILD_AND_ADD;
+            } else {
+                // Rows are the same. So just check if column visibility needs 
to be updated and
+                // move on to the next parent and child keys.
+                return CompareKeysResult.ADVANCE_BOTH;
+            }
+        }
+    }
+
+    private static RyaStatement 
updateRyaStatementColumnVisibility(RyaStatement ryaStatement, ColumnVisibility 
newCv) {
+        RyaStatement newCvRyaStatement = new 
RyaStatement(ryaStatement.getSubject(), ryaStatement.getPredicate(), 
ryaStatement.getObject(), ryaStatement.getContext(), 
ryaStatement.getQualifer(), newCv.getExpression(), ryaStatement.getValue(), 
ryaStatement.getTimestamp());
+        return newCvRyaStatement;
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+        super.cleanup(context);
+        log.info("Cleaning up mapper...");
+        if (childScanner != null) {
+            childScanner.close();
+        }
+        try {
+            if (childDao != null) {
+                childDao.destroy();
+            }
+        } catch (RyaDAOException e) {
+            log.error("Error destroying child DAO", e);
+        }
+        log.info("Cleaned up mapper");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/RowRuleMapper.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/RowRuleMapper.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/RowRuleMapper.java
new file mode 100644
index 0000000..76b24be
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/RowRuleMapper.java
@@ -0,0 +1,200 @@
+package mvm.rya.accumulo.mr.merge.mappers;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.log4j.Logger;
+
+import mvm.rya.accumulo.mr.merge.MergeTool;
+import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
+import mvm.rya.accumulo.mr.merge.util.GroupedRow;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+
+/**
+ * Rule mapper that outputs statements as serialized Accumulo rows along with 
their destination
+ * table names, using {@link GroupedRow} as a composite Writable to represent 
an output
+ * of <(table, Key), (Key, Value)>.
+ *
+ * In order to ensure proper indexing, the statements are first
+ * inserted into a mock Rya instance in memory, and the tables that make up 
that instance are
+ * periodically flushed to the output.
+ */
+public class RowRuleMapper extends BaseRuleMapper<GroupedRow, GroupedRow> {
+    /**
+     * The number of statements each mapper will hold in memory before 
flushing the serialized rows to the output.
+     */
+    public static final String MAX_STATEMENTS_PROP = 
"ac.copy.cache.statements.max";
+
+    /**
+     * The default number of statements each mapper will hold in memory before 
flushing the serialized rows to the output.
+     */
+    public static final int MAX_STATEMENTS_DEFAULT = 10000;
+
+    private static final Logger log = Logger.getLogger(RowRuleMapper.class);
+    private GroupedRow compositeKey = new GroupedRow();
+    private GroupedRow compositeVal = new GroupedRow();
+    private int cachedStatements = 0;
+    private int maxStatements;
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        super.setup(context);
+        // Set up a mock child DAO for caching serialized statements
+        childAccumuloRdfConfiguration.setBoolean(MRUtils.AC_MOCK_PROP, true);
+        childUser = "root";
+        childAccumuloRdfConfiguration.set(MRUtils.AC_USERNAME_PROP, childUser);
+        childAccumuloRdfConfiguration.set(MRUtils.AC_PWD_PROP, "");
+        MergeTool.setDuplicateKeys(childAccumuloRdfConfiguration);
+        childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
+        childDao = AccumuloRyaUtils.setupDao(childConnector, 
childAccumuloRdfConfiguration);
+        copyAuthorizations();
+        addMetadataKeys(context); // call in super.setup() does nothing, has 
to be done after DAO is initialized
+        // Determine the size of the cache
+        maxStatements = 
childAccumuloRdfConfiguration.getInt(MAX_STATEMENTS_PROP, 
MAX_STATEMENTS_DEFAULT);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+        flush(context);
+        super.cleanup(context);
+    }
+
+    /**
+     * Add a statement to an in-memory Accumulo instance, serialized as 
spo/pos/osp and any applicable secondary
+     * indexes, and flush the in-memory rows through to the output if enough 
statements have been cached.
+     * @param rstmt RyaStatement to copy to the child
+     * @param context Context to use for writing
+     * @throws InterruptedException if Hadoop is interrupted while writing 
output.
+     * @throws IOException if an error is encountered serializing and storing 
the statement in memory, or
+     *      if Hadoop reports an error writing the in-memory tables to the 
output.
+     */
+    @Override
+    protected void copyStatement(RyaStatement rstmt, Context context) throws 
IOException, InterruptedException {
+        try {
+            childDao.add(rstmt);
+            cachedStatements++;
+        } catch (RyaDAOException e) {
+            throw new IOException("Error serializing RyaStatement", e);
+        }
+        if (cachedStatements >= maxStatements) {
+            flush(context);
+        }
+    }
+
+    /**
+     * Copy a row directly -- just pass it through to the map output.
+     * @param Key Row's key
+     * @param Value Row's value
+     * @param context Context to use for writing
+     * @throws InterruptedException If Hadoop is interrupted during map output
+     * @throws IOException If Hadoop reports an error writing the output
+     */
+    @Override
+    protected void copyRow(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+        compositeKey.setGroup(childTableName);
+        compositeKey.setKey(key);
+        compositeVal.setKey(key);
+        compositeVal.setValue(value);
+        context.write(compositeKey, compositeVal);
+    }
+
+    /**
+     * Insert copy tool metadata, if the in-memory instance has been 
configured.
+     */
+    @Override
+    protected void addMetadataKeys(Context context) throws IOException {
+        try {
+            if (childDao != null && childDao.isInitialized()) {
+                if (runTime != null) {
+                    RyaStatement ryaStatement = 
AccumuloRyaUtils.createCopyToolRunTimeRyaStatement(runTime);
+                    copyStatement(ryaStatement, context);
+                }
+                if (startTime != null) {
+                    RyaStatement ryaStatement = 
AccumuloRyaUtils.createCopyToolSplitTimeRyaStatement(startTime);
+                    copyStatement(ryaStatement, context);
+                }
+                if (timeOffset != null) {
+                    RyaStatement ryaStatement = 
AccumuloRyaUtils.createTimeOffsetRyaStatement(timeOffset);
+                    copyStatement(ryaStatement, context);
+                }
+            }
+        } catch (RyaDAOException | IOException | InterruptedException e) {
+            throw new IOException("Failed to write metadata key", e);
+        }
+    }
+
+    private void flush(Context context) throws IOException, 
InterruptedException {
+        try {
+            childDao.flush();
+        } catch (RyaDAOException e) {
+            throw new IOException("Error writing to in-memory table", e);
+        }
+        TableOperations ops = childConnector.tableOperations();
+        SecurityOperations secOps = childConnector.securityOperations();
+        Authorizations childAuths;
+        try {
+            childAuths = secOps.getUserAuthorizations(childUser);
+        } catch (AccumuloException | AccumuloSecurityException e) {
+            throw new IOException("Error connecting to mock instance", e);
+        }
+        for (String table : ops.list()) {
+            // Only copy Rya tables (skip system tables)
+            if (!table.startsWith(childTablePrefix)) {
+                continue;
+            }
+            compositeKey.setGroup(table);
+            try {
+                // Output every row in this mock table
+                int rows = 0;
+                Scanner scanner = childDao.getConnector().createScanner(table, 
childAuths);
+                for (Map.Entry<Key, Value> row : scanner) {
+                    compositeKey.setKey(row.getKey());
+                    compositeVal.setKey(row.getKey());
+                    compositeVal.setValue(row.getValue());
+                    context.write(compositeKey, compositeVal);
+                    rows++;
+                }
+                log.info("Flushed " + rows + " in-memory rows to output (" + 
table + ").");
+                // Then clear the table
+                if (rows > 0) {
+                    ops.deleteRows(table, null, null);
+                }
+            } catch (TableNotFoundException | AccumuloException | 
AccumuloSecurityException e) {
+                throw new IOException("Error flushing in-memory table", e);
+            }
+        }
+        // All tables should be empty
+        cachedStatements = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/reducers/MultipleFileReducer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/reducers/MultipleFileReducer.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/reducers/MultipleFileReducer.java
new file mode 100644
index 0000000..c526e6d
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/reducers/MultipleFileReducer.java
@@ -0,0 +1,64 @@
+package mvm.rya.accumulo.mr.merge.reducers;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+
+import mvm.rya.accumulo.mr.merge.util.GroupedRow;
+
+/**
+ * Outputs rows to different files according to their associated group names, 
for use with {@link AccumuloFileOutputFormat}.
+ */
+public class MultipleFileReducer extends Reducer<GroupedRow, GroupedRow, Key, 
Value> {
+    private MultipleOutputs<Key, Value> mos;
+
+    @Override
+    protected void setup(Context context) {
+        mos = new MultipleOutputs<>(context);
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, 
InterruptedException {
+        if (mos != null) {
+            mos.close();
+        }
+    }
+
+    /**
+     * Writes <{@link Key}, {@link Value}> pairs to a file, where the path to 
the output file is determined by the group.
+     * @param   group   Contains the group name (a String) which is used to 
route output to the appropriate subdirectory
+     * @param   rows    Contain the actual Accumulo rows to be written
+     * @param   context Context for writing
+     */
+    @Override
+    protected void reduce(GroupedRow group, Iterable<GroupedRow> rows, Context 
context) throws IOException, InterruptedException {
+        String groupName = group.getGroup().toString();
+        String destination = groupName + "/files/part";
+        for (GroupedRow row : rows) {
+            mos.write(row.getKey(), row.getValue(), destination);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloInstanceDriver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloInstanceDriver.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloInstanceDriver.java
new file mode 100644
index 0000000..bc7abdc
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloInstanceDriver.java
@@ -0,0 +1,575 @@
+package mvm.rya.accumulo.mr.merge.util;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.SystemUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.merge.MergeTool;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import twitter4j.Logger;
+
+/**
+ * Handles running a single {@link MiniAccumuloCluster} or a single {@link 
MockInstance} for an instance.
+ */
+public class AccumuloInstanceDriver {
+    private static final Logger log = 
Logger.getLogger(AccumuloInstanceDriver.class);
+
+    private static final boolean IS_COPY_HADOOP_HOME_ENABLED = true;
+
+    public static final String ROOT_USER_NAME = "root";
+
+    private String driverName;
+    private boolean isMock;
+    private boolean shouldCreateIndices;
+    private boolean isReadOnly;
+    private boolean isParent;
+
+    private String user;
+    private String password;
+    private String instanceName;
+    private String tablePrefix;
+    private String auth;
+
+    private Connector connector;
+
+    private AccumuloRyaDAO dao;
+
+    private SecurityOperations secOps;
+
+    private AccumuloRdfConfiguration config = new AccumuloRdfConfiguration();
+
+    private MiniAccumuloCluster miniAccumuloCluster = null;
+
+    private MockInstance mockInstance = null;
+
+    private ZooKeeperInstance zooKeeperInstance = null;
+
+    private Instance instance = null;
+
+    private String zooKeepers;
+
+    private Map<String, String> configMap = new LinkedHashMap<>();
+
+    private List<String> indices = null;
+
+    private List<String> tableList = new ArrayList<>();
+
+    private File tempDir = null;
+
+    public static final List<String> TABLE_NAME_SUFFIXES =
+        ImmutableList.<String>of(
+            RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_PO_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_NS_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_STATS_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_SEL_SUFFIX
+        );
+
+    /**
+     * Creates a new instance of {@link AccumuloInstanceDriver}.
+     * @param driverName the name used to identify this driver in the logs. 
(not {@code null})
+     * @param isMock {@code true} if the instance will use {@link 
MockInstance}s.
+     * {@code false} if the instance will use {@link MiniAccumuloCluster}s.
+     * @param shouldCreateIndices {@code true} to create all the indices 
associated with a Rya deployment.
+     * {@code false} otherwise.
+     * @param isReadOnly {@code true} if all the tables in the instance should 
have their
+     * table permissions set to read only.  {@code false} if the table 
permission are set to write.
+     * @param isParent {@code true} if the instance is the parent/main 
instance. {@code false} if it's the
+     * child.
+     * @param user the user name tied to this instance.
+     * @param password the password for the user.
+     * @param instanceName the name of the instance.
+     * @param tablePrefix the table prefix.
+     * @param auth the comma-separated authorization list.
+     */
+    public AccumuloInstanceDriver(String driverName, boolean isMock, boolean 
shouldCreateIndices, boolean isReadOnly, boolean isParent, String user, String 
password, String instanceName, String tablePrefix, String auth) {
+        this.driverName = Preconditions.checkNotNull(driverName);
+        this.isMock = isMock;
+        this.shouldCreateIndices = shouldCreateIndices;
+        this.isReadOnly = isReadOnly;
+        this.user = user;
+        this.password = password;
+        this.instanceName = instanceName;
+        this.tablePrefix = tablePrefix;
+        this.auth = auth;
+        this.isParent = isParent;
+
+        config.setTablePrefix(tablePrefix);
+    }
+
+    /**
+     * Sets up the {@link AccumuloInstanceDriver}.
+     * @throws Exception
+     */
+    public void setUp() throws Exception {
+        setUpInstance();
+        setUpTables();
+        setUpDao();
+        setUpConfig();
+    }
+
+    /**
+     * Sets up the {@link MiniAccumuloCluster} or the {@link MockInstance}.
+     * @throws Exception
+     */
+    public void setUpInstance() throws Exception {
+        if (!isMock) {
+            log.info("Setting up " + driverName + " MiniAccumulo cluster...");
+            // Create and Run MiniAccumulo Cluster
+            tempDir = Files.createTempDir();
+            tempDir.deleteOnExit();
+            miniAccumuloCluster = new MiniAccumuloCluster(tempDir, password);
+            copyHadoopHomeToTemp();
+            miniAccumuloCluster.getConfig().setInstanceName(instanceName);
+            log.info(driverName + " MiniAccumulo instance starting up...");
+            miniAccumuloCluster.start();
+            Thread.sleep(1000);
+            log.info(driverName + " MiniAccumulo instance started");
+            log.info("Creating connector to " + driverName + " MiniAccumulo 
instance...");
+            zooKeeperInstance = new 
ZooKeeperInstance(miniAccumuloCluster.getClientConfig());
+            instance = zooKeeperInstance;
+            connector = zooKeeperInstance.getConnector(user, new 
PasswordToken(password));
+            log.info("Created connector to " + driverName + " MiniAccumulo 
instance");
+        } else {
+            log.info("Setting up " + driverName + " mock instance...");
+            mockInstance = new MockInstance(instanceName);
+            instance = mockInstance;
+            connector = mockInstance.getConnector(user, new 
PasswordToken(password));
+            log.info("Created connector to " + driverName + " mock instance");
+        }
+        zooKeepers = instance.getZooKeepers();
+    }
+
+    /**
+     * Copies the HADOOP_HOME bin directory to the {@link MiniAccumuloCluster} 
temp directory.
+     * {@link MiniAccumuloCluster} expects to find bin/winutils.exe in the MAC 
temp
+     * directory instead of HADOOP_HOME for some reason.
+     * @throws IOException
+     */
+    private void copyHadoopHomeToTemp() throws IOException {
+        if (IS_COPY_HADOOP_HOME_ENABLED && SystemUtils.IS_OS_WINDOWS) {
+            String hadoopHomeEnv = System.getenv("HADOOP_HOME");
+            if (hadoopHomeEnv != null) {
+                File hadoopHomeDir = new File(hadoopHomeEnv);
+                if (hadoopHomeDir.exists()) {
+                    File binDir = Paths.get(hadoopHomeDir.getAbsolutePath(), 
"/bin").toFile();
+                    if (binDir.exists()) {
+                        FileUtils.copyDirectoryToDirectory(binDir, tempDir);
+                    } else {
+                        log.warn("The specified path for the Hadoop bin 
directory does not exist: " + binDir.getAbsolutePath());
+                    }
+                } else {
+                    log.warn("The specified path for HADOOP_HOME does not 
exist: " + hadoopHomeDir.getAbsolutePath());
+                }
+            } else {
+                log.warn("The HADOOP_HOME environment variable was not 
found.");
+            }
+        }
+    }
+
+    /**
+     * Sets up all the tables and indices.
+     * @throws Exception
+     */
+    public void setUpTables() throws Exception {
+        // Setup tables and permissions
+        log.info("Setting up " + driverName + " tables and permissions");
+        for (String tableSuffix : TABLE_NAME_SUFFIXES) {
+            String tableName = tablePrefix + tableSuffix;
+            tableList.add(tableName);
+            if (!connector.tableOperations().exists(tableName)) {
+                connector.tableOperations().create(tableName);
+            }
+        }
+
+        if (shouldCreateIndices) {
+            indices = Arrays.asList(
+                ConfigUtils.getFreeTextDocTablename(config),
+                ConfigUtils.getFreeTextTermTablename(config),
+                ConfigUtils.getGeoTablename(config),
+                ConfigUtils.getTemporalTableName(config),
+                ConfigUtils.getEntityTableName(config)
+            );
+
+            tableList.addAll(indices);
+
+            log.info("Setting up " + driverName + " indices");
+            for (String index : indices) {
+                if (!connector.tableOperations().exists(index)) {
+                    connector.tableOperations().create(index);
+                }
+            }
+        }
+
+        // Setup user with authorizations
+        log.info("Creating " + driverName + " user and authorizations");
+        secOps = connector.securityOperations();
+        if (!user.equals(ROOT_USER_NAME)) {
+            secOps.createLocalUser(user, new PasswordToken(password));
+        }
+        addAuths(auth);
+        TablePermission tablePermission = isReadOnly ? TablePermission.READ : 
TablePermission.WRITE;
+        for (String tableSuffix : TABLE_NAME_SUFFIXES) {
+            secOps.grantTablePermission(user, tablePrefix + tableSuffix, 
tablePermission);
+        }
+        if (shouldCreateIndices) {
+            for (String index : indices) {
+                secOps.grantTablePermission(user, index, tablePermission);
+            }
+        }
+    }
+
+    /**
+     * Sets up the {@link AccumuloRyaDAO}.
+     * @throws Exception
+     */
+    public void setUpDao() throws Exception {
+        // Setup dao
+        log.info("Creating " + driverName + " DAO");
+        dao = new AccumuloRyaDAO();
+        dao.setConnector(connector);
+        dao.setConf(config);
+
+        // Flush the tables before initializing the DAO
+        for (String tableName : tableList) {
+            connector.tableOperations().flush(tableName, null, null, false);
+        }
+
+        dao.init();
+    }
+
+    /**
+     * Sets up the configuration and prints the arguments.
+     */
+    public void setUpConfig() {
+        log.info("Setting " + driverName + " config");
+
+        // Setup config
+        if (isMock) {
+            configMap.put(MRUtils.AC_MOCK_PROP, Boolean.TRUE.toString());
+        }
+        configMap.put(MRUtils.AC_INSTANCE_PROP, instanceName);
+        configMap.put(MRUtils.AC_USERNAME_PROP, user);
+        configMap.put(MRUtils.AC_PWD_PROP, password);
+        configMap.put(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
+        configMap.put(MRUtils.AC_AUTH_PROP, auth);
+        configMap.put(MRUtils.AC_ZK_PROP, zooKeepers != null ? zooKeepers : 
"localhost");
+
+        log.info(driverName + " config properties");
+        config.setTablePrefix(tablePrefix);
+        for (Entry<String, String> entry : configMap.entrySet()) {
+            String key = entry.getKey();
+            String value = entry.getValue();
+            String argument = ToolConfigUtils.makeArgument(isParent ? key : 
key + MergeTool.CHILD_SUFFIX, value);
+            log.info(argument);
+            config.set(key, value);
+        }
+
+        MergeTool.setDuplicateKeys(config);
+    }
+
+    /**
+     * Tears down all the tables and indices.
+     * @throws Exception
+     */
+    public void tearDownTables() throws Exception {
+        // delete all tables.
+        if (connector != null) {
+            for (String tableName : tableList) {
+                if (connector.tableOperations().exists(tableName)) {
+                    connector.tableOperations().delete(tableName);
+                }
+            }
+        }
+    }
+
+    /**
+     * Tears down the {@link AccumuloRyaDAO}.
+     * @throws Exception
+     */
+    public void tearDownDao() throws Exception {
+        if (dao != null) {
+            log.info("Stopping " + driverName + " DAO");
+            try {
+                dao.destroy();
+            } catch (RyaDAOException e) {
+                log.error("Error stopping " + driverName + " DAO", e);
+            }
+            dao = null;
+        }
+    }
+
+    /**
+     * Tears down the instance.
+     * @throws Exception
+     */
+    public void tearDownInstance() throws Exception {
+        if (miniAccumuloCluster != null) {
+            log.info("Stopping " + driverName + " cluster");
+            try {
+                miniAccumuloCluster.stop();
+            } catch (IOException | InterruptedException e) {
+                log.error("Error stopping " + driverName + " cluster", e);
+            }
+            miniAccumuloCluster = null;
+        }
+    }
+
+    /**
+     * Tears down the {@link AccumuloInstanceDriver}.
+     * @throws Exception
+     */
+    public void tearDown() throws Exception {
+        try {
+            //tearDownTables();
+            tearDownDao();
+            tearDownInstance();
+        } finally {
+            removeTempDir();
+        }
+    }
+
+    /**
+     * Deletes the {@link MiniAccumuloCluster} temporary directory.
+     */
+    public void removeTempDir() {
+        if (tempDir != null) {
+            try {
+                FileUtils.deleteDirectory(tempDir);
+            } catch (IOException e) {
+                log.error("Error deleting " + driverName + " temp directory", 
e);
+            }
+            tempDir = null;
+        }
+    }
+
+    /**
+     * Adds authorizations to the {@link SecurityOperations} of this 
instance's user.
+     * @param auths the list of authorizations to add.
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     */
+    public void addAuths(String... auths) throws AccumuloException, 
AccumuloSecurityException {
+        Authorizations newAuths = AccumuloRyaUtils.addUserAuths(user, secOps, 
auths);
+        secOps.changeUserAuthorizations(user, newAuths);
+    }
+
+    /**
+     * @return the {@link Authorizations} of this instance's user.
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     */
+    public Authorizations getAuths() throws AccumuloException, 
AccumuloSecurityException {
+        if (secOps != null) {
+            return secOps.getUserAuthorizations(user);
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * @return {@code true} if this is a mock instance.  {@code false} if this 
is a MiniAccumuloCluster instance.
+     */
+    public boolean isMock() {
+        return isMock;
+    }
+
+    /**
+     * @return {@code true} to create all the indices associated with a Rya 
deployment.
+     * {@code false} otherwise.
+     */
+    public boolean shouldCreateIndices() {
+        return shouldCreateIndices;
+    }
+
+    /**
+     * @return {@code true} if all the tables in the instance should have their
+     * table permissions set to read only.  {@code false} if the table 
permission are set to write.
+     */
+    public boolean isReadOnly() {
+        return isReadOnly;
+    }
+
+    /**
+     * @return the user name tied to this instance
+     */
+    public String getUser() {
+        return user;
+    }
+
+    /**
+     * @return the password for the user.
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * @return the name of the instance.
+     */
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    /**
+     * @return the table prefix.
+     */
+    public String getTablePrefix() {
+        return tablePrefix;
+    }
+
+    /**
+     * @return the comma-separated authorization list.
+     */
+    public String getAuth() {
+        return auth;
+    }
+
+    /**
+     * @return the {@link Connector} to this instance.
+     */
+    public Connector getConnector() {
+        return connector;
+    }
+
+    /**
+     * Sets the {@link Connector} to this instance.
+     * @param connector the {@link Connector}.
+     */
+    public void setConnector(Connector connector) {
+        this.connector = connector;
+    }
+
+    /**
+     * @return the {@link AccumuloRyaDAO}.
+     */
+    public AccumuloRyaDAO getDao() {
+        return dao;
+    }
+
+    /**
+     * @return the {@link SecurityOperations}.
+     */
+    public SecurityOperations getSecOps() {
+        return secOps;
+    }
+
+    /**
+     * @return the {@link AccumuloRdfConfiguration}.
+     */
+    public AccumuloRdfConfiguration getConfig() {
+        return config;
+    }
+
+    /**
+     * @return the {@link MiniAccumuloCluster} for this instance or {@code 
null} if
+     * this is a {@link MockInstance}.
+     */
+    public MiniAccumuloCluster getMiniAccumuloCluster() {
+        return miniAccumuloCluster;
+    }
+
+    /**
+     * @return the {@link MockInstance} for this instance or {@code null} if
+     * this is a {@link MiniAccumuloCluster}.
+     */
+    public MockInstance getMockInstance() {
+        return mockInstance;
+    }
+
+    /**
+     * @return the {@link ZooKeeperInstance} for this instance or {@code null} 
if
+     * this is a {@link MockInstance}.
+     */
+    public ZooKeeperInstance getZooKeeperInstance() {
+        return zooKeeperInstance;
+    }
+
+    /**
+     * @return the {@link ZooKeepInstance} or {@link MockInstance}.
+     */
+    public Instance getInstance() {
+        return instance;
+    }
+
+    /**
+     * @return the comma-separated list of zoo keeper host names.
+     */
+    public String getZooKeepers() {
+        return zooKeepers;
+    }
+
+    /**
+     * @return an unmodifiable map of the configuration keys and values.
+     */
+    public Map<String, String> getConfigMap() {
+        return Collections.unmodifiableMap(configMap);
+    }
+
+    /**
+     * @return an unmodifiable list of the table names and indices.
+     */
+    public List<String> getTableList() {
+        return Collections.unmodifiableList(tableList);
+    }
+
+    /**
+     * @return the {@link MiniAccumuloCluster} temporary directory for this 
instance or {@code null}
+     * if it's a {@link MockInstance}.
+     */
+    public File getTempDir() {
+        return tempDir;
+    }
+}
\ No newline at end of file


Reply via email to