http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java
new file mode 100644
index 0000000..067ff49
--- /dev/null
+++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/CopyTool.java
@@ -0,0 +1,1003 @@
+package mvm.rya.accumulo.mr.merge;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Paths;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloMultiTableInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.client.mapreduce.InputTableConfig;
+import 
org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.iterators.user.TimestampFilter;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+
+import com.google.common.base.Joiner;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.mr.merge.common.InstanceType;
+import mvm.rya.accumulo.mr.merge.gui.DateTimePickerDialog;
+import mvm.rya.accumulo.mr.merge.mappers.AccumuloCopyToolMapper;
+import mvm.rya.accumulo.mr.merge.mappers.AccumuloRyaRuleMapper;
+import mvm.rya.accumulo.mr.merge.mappers.FileCopyToolMapper;
+import mvm.rya.accumulo.mr.merge.mappers.RowRuleMapper;
+import mvm.rya.accumulo.mr.merge.mappers.MergeToolMapper;
+import mvm.rya.accumulo.mr.merge.reducers.MultipleFileReducer;
+import mvm.rya.accumulo.mr.merge.util.AccumuloInstanceDriver;
+import mvm.rya.accumulo.mr.merge.util.AccumuloQueryRuleset;
+import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
+import mvm.rya.accumulo.mr.merge.util.GroupedRow;
+import mvm.rya.accumulo.mr.merge.util.TimeUtils;
+import mvm.rya.accumulo.mr.merge.util.ToolConfigUtils;
+import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.api.layout.TablePrefixLayoutStrategy;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+/**
+ * Handles copying data from a parent instance into a child instance.
+ */
+public class CopyTool extends AbstractDualInstanceAccumuloMRTool {
+    private static final Logger log = Logger.getLogger(CopyTool.class);
+
+    /**
+     * Use this property to set the tables that are going to be copied.  The 
list should
+     * be a comma-separated string containing the full table names.  If not 
set, then all
+     * tables will be copied.
+     */
+    public static final String COPY_TABLE_LIST_PROP = "copy.table.list";
+
+    /**
+     * Indicates the type of child instance to create.  {@code null} or empty 
to not create an
+     * instance indicating that it already was created and exists.
+     */
+    public static final String CREATE_CHILD_INSTANCE_TYPE_PROP = 
"create.child.instance.type";
+
+    /**
+     * The time difference between the parent machine and the time server.
+     */
+    public static final String PARENT_TIME_OFFSET_PROP = "time.offset";
+
+    /**
+     * The time difference between the child machine and the time server.
+     */
+    public static final String CHILD_TIME_OFFSET_PROP = "time.offset.child";
+
+    /**
+     * The host name of the time server to use.
+     */
+    public static final String NTP_SERVER_HOST_PROP = "ntp.server.host";
+
+    /**
+     * The URL of the Apache Tomcat server web page running on the parent 
machine.
+     */
+    public static final String PARENT_TOMCAT_URL_PROP = "tomcat.url";
+
+    /**
+     * The URL of the Apache Tomcat server web page running on the child 
machine.
+     */
+    public static final String CHILD_TOMCAT_URL_PROP = "tomcat.url.child";
+
+    /**
+     * The run time of the copy process.
+     */
+    public static final String COPY_RUN_TIME_PROP = "copy.run.time";
+
+    /**
+     * "true" to use the NTP server to handle time synchronization.
+     * "false" (or any other value) to not use the NTP server.
+     */
+    public static final String USE_NTP_SERVER_PROP = "use.ntp.server";
+
+    /**
+     * "true" to use file output. "false" to use Accumulo output.
+     */
+    public static final String USE_COPY_FILE_OUTPUT = "use.copy.file.output";
+
+    /**
+     * The file path to output the child data to.
+     */
+    public static final String COPY_FILE_OUTPUT_PATH = "copy.file.output.path";
+
+    /**
+     * The compression type to use for file output.  One of "none", "gz", 
"lzo", or "snappy".
+     */
+    public static final String COPY_FILE_OUTPUT_COMPRESSION_TYPE = 
"copy.file.output.compression.type";
+
+    /**
+     * "true" to clear the file output directory before copying. "false" to 
leave the output directory alone.
+     */
+    public static final String USE_COPY_FILE_OUTPUT_DIRECTORY_CLEAR = 
"use.copy.file.output.directory.clear";
+
+    /**
+     * The input directory for importing files into accumulo tables.
+     */
+    public static final String COPY_FILE_IMPORT_DIRECTORY = 
"copy.file.import.directory";
+
+    /**
+     * "true" to read from the input directory. "false" otherwise.
+     */
+    public static final String USE_COPY_FILE_IMPORT = "use.copy.file.import";
+
+    /**
+     * "true" to extract a set of rules from a SPARQL query, and only copy 
statements relevant to those rules. "false" otherwise.
+     * If set, either the query itself or a query file should also be provided.
+     */
+    public static final String USE_COPY_QUERY_SPARQL = "use.copy.query.sparql";
+
+    /**
+     * The text of the query that defines which statements to copy.
+     */
+    public static final String QUERY_STRING_PROP = "ac.copy.query";
+
+    /**
+     * The path to a file containing the query that defines which statements 
to copy.
+     */
+    public static final String QUERY_FILE_PROP = "ac.copy.queryfile";
+
+    private static final String DIALOG_TITLE = "Select a Start Time/Date";
+    private static final String DIALOG_MESSAGE =
+        "<html>Choose the time of the data to copy.<br>Only parent data AFTER 
the selected time will be copied to the child.</html>";
+
+    private String startTime = null;
+    private boolean useCopyFileOutput = false;
+    private String baseOutputDir = null;
+    private String localBaseOutputDir = null;
+    private String compressionType = null;
+    private boolean useCopyFileOutputDirectoryClear = false;
+    private String tempDir = null;
+    private boolean useCopyFileImport = false;
+    private boolean useQuery = false;
+    private String localCopyFileImportDir = null;
+    private String baseImportDir = null;
+
+    private final List<String> tables = new ArrayList<>();
+
+    private AccumuloInstanceDriver childAccumuloInstanceDriver = null;
+
+    /**
+     * Sets up and initializes the copy tool's configuration.
+     * @throws Exception
+     */
+    public void setup() throws Exception {
+        super.init();
+
+        tempDir = conf.get("hadoop.tmp.dir", null);
+        if (tempDir == null) {
+            throw new Exception("Invalid hadoop temp directory. 
\"hadoop.tmp.dir\" could not be found in the configuration.");
+        }
+
+        useCopyFileOutput = conf.getBoolean(USE_COPY_FILE_OUTPUT, false);
+        baseOutputDir = tempDir + "/copy_tool_file_output/";
+        localBaseOutputDir = conf.get(COPY_FILE_OUTPUT_PATH, null);
+        compressionType = conf.get(COPY_FILE_OUTPUT_COMPRESSION_TYPE, null);
+        useCopyFileOutputDirectoryClear = 
conf.getBoolean(USE_COPY_FILE_OUTPUT_DIRECTORY_CLEAR, false);
+        localCopyFileImportDir = conf.get(COPY_FILE_IMPORT_DIRECTORY, null);
+        baseImportDir = tempDir + "/copy_tool_import/";
+
+        startTime = conf.get(MergeTool.START_TIME_PROP, null);
+
+        if (!useCopyFileImport) {
+            // Display start time dialog if requested
+            if (MergeTool.USE_START_TIME_DIALOG.equals(startTime)) {
+                log.info("Select start time from dialog...");
+                DateTimePickerDialog dateTimePickerDialog = new 
DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE);
+                dateTimePickerDialog.setVisible(true);
+
+                Date date = dateTimePickerDialog.getSelectedDateTime();
+                startTime = MergeTool.START_TIME_FORMATTER.format(date);
+                conf.set(MergeTool.START_TIME_PROP, startTime);
+                log.info("Will copy all data after " + date);
+            } else if (startTime != null) {
+                try {
+                    Date date = 
MergeTool.START_TIME_FORMATTER.parse(startTime);
+                    log.info("Will copy all data after " + date);
+                } catch (ParseException e) {
+                    throw new Exception("Unable to parse the provided start 
time: " + startTime, e);
+                }
+            }
+
+            Date copyRunTime = new Date();
+            boolean useTimeSync = conf.getBoolean(USE_NTP_SERVER_PROP, false);
+            if (useTimeSync) {
+                String tomcatUrl = conf.get(PARENT_TOMCAT_URL_PROP, null);
+                String ntpServerHost = conf.get(NTP_SERVER_HOST_PROP, null);
+                Long timeOffset = null;
+                Date ntpDate = null;
+                try {
+                    log.info("Comparing parent machine's time to NTP server 
time...");
+                    ntpDate = TimeUtils.getNtpServerDate(ntpServerHost);
+                    Date parentMachineDate = 
TimeUtils.getMachineDate(tomcatUrl);
+                    boolean isMachineLocal = 
TimeUtils.isUrlLocalMachine(tomcatUrl);
+                    timeOffset = TimeUtils.getTimeDifference(ntpDate, 
parentMachineDate, isMachineLocal);
+                } catch (IOException | ParseException e) {
+                    throw new Exception("Unable to get time difference between 
machine and NTP server.", e);
+                }
+                if (timeOffset != null) {
+                    conf.set(PARENT_TIME_OFFSET_PROP, "" + timeOffset);
+                }
+                copyRunTime = ntpDate;
+            }
+            String copyRunTimeString = 
MergeTool.START_TIME_FORMATTER.format(copyRunTime);
+            if (copyRunTime != null) {
+                conf.set(COPY_RUN_TIME_PROP, copyRunTimeString);
+            }
+        }
+
+        MergeTool.setDuplicateKeys(conf);
+
+        String copyTableListProperty = conf.get(COPY_TABLE_LIST_PROP);
+        if (StringUtils.isNotBlank(copyTableListProperty)) {
+            // Copy the tables specified in the config
+            String[] split = copyTableListProperty.split(",");
+            tables.addAll(Arrays.asList(split));
+        } else if (useCopyFileImport) {
+            File importDir = new File(localCopyFileImportDir);
+            String[] files = importDir.list();
+            tables.addAll(Arrays.asList(files));
+        } else {
+            // By default copy all tables
+            tables.add(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+            tables.add(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
+            tables.add(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
+            tables.add(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_NS_SUFFIX);
+            tables.add(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
+            tables.add(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_STATS_SUFFIX);
+            tables.add(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SEL_SUFFIX);
+            tables.add(ConfigUtils.getFreeTextDocTablename(conf));
+            tables.add(ConfigUtils.getFreeTextTermTablename(conf));
+            tables.add(ConfigUtils.getGeoTablename(conf));
+            tables.add(ConfigUtils.getTemporalTableName(conf));
+            tables.add(ConfigUtils.getEntityTableName(conf));
+        }
+        if (tables.isEmpty()) {
+            log.warn("No list of tables to copy was provided.");
+        } else {
+            String tablesToCopy = Joiner.on("\r\n\t").join(tables);
+            log.info("Will attempt to copy the following tables/indices from 
the parent:\r\n\t" + tablesToCopy);
+        }
+    }
+
+    @Override
+    public int run(String[] strings) throws Exception {
+        useCopyFileImport = conf.getBoolean(USE_COPY_FILE_IMPORT, false);
+        useQuery = conf.getBoolean(USE_COPY_QUERY_SPARQL, false);
+
+        if (useCopyFileImport) {
+            return runImport();
+        } else if (useQuery) {
+            return runQueryCopy();
+        } else {
+            return runCopy();
+        }
+    }
+
+    private int runCopy() throws Exception {
+        log.info("Setting up Copy Tool...");
+
+        setup();
+
+        if (!useCopyFileOutput) {
+            createChildInstance(conf);
+        }
+
+        AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(conf);
+        parentAccumuloRdfConfiguration.setTablePrefix(tablePrefix);
+        Connector parentConnector = 
AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration);
+        TableOperations parentTableOperations = 
parentConnector.tableOperations();
+
+        for (String table : tables) {
+            // Check if the parent table exists before creating a job on it
+            if (parentTableOperations.exists(table)) {
+                String childTable = table.replaceFirst(tablePrefix, 
childTablePrefix);
+                String jobName = "Copy Tool, copying Parent Table: " + table + 
", into Child Table: " + childTable + ", " + System.currentTimeMillis();
+                log.info("Initializing job: " + jobName);
+                conf.set(MRUtils.JOB_NAME_PROP, jobName);
+                conf.set(MergeTool.TABLE_NAME_PROP, table);
+
+                Job job = Job.getInstance(conf);
+                job.setJarByClass(CopyTool.class);
+
+                setupInputFormat(job);
+
+                AccumuloInputFormat.setInputTableName(job, table);
+
+                // Set input output of the particular job
+                if (useCopyFileOutput) {
+                    job.setMapOutputKeyClass(Key.class);
+                    job.setMapOutputValueClass(Value.class);
+                    job.setOutputKeyClass(Key.class);
+                    job.setOutputValueClass(Value.class);
+                } else {
+                    job.setMapOutputKeyClass(Text.class);
+                    job.setMapOutputValueClass(Mutation.class);
+                    job.setOutputKeyClass(Text.class);
+                    job.setOutputValueClass(Mutation.class);
+                }
+
+                setupOutputFormat(job, childTable);
+
+                // Set mapper and reducer classes
+                if (useCopyFileOutput) {
+                    setupSplitsFile(job, parentTableOperations, table, 
childTable);
+                    job.setMapperClass(FileCopyToolMapper.class);
+                } else {
+                    job.setMapperClass(AccumuloCopyToolMapper.class);
+                }
+                job.setReducerClass(Reducer.class);
+
+                // Submit the job
+                Date beginTime = new Date();
+                log.info("Job for table \"" + table + "\" started: " + 
beginTime);
+                int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+                if (exitCode == 0) {
+                    if (useCopyFileOutput) {
+                        log.info("Moving data from HDFS to the local file 
system for the table: " + childTable);
+                        Path hdfsPath = getPath(baseOutputDir, childTable);
+                        Path localPath = getPath(localBaseOutputDir, 
childTable);
+                        log.info("HDFS directory: " + hdfsPath.toString());
+                        log.info("Local directory: " + localPath.toString());
+                        copyHdfsToLocal(hdfsPath, localPath);
+                    }
+
+                    Date endTime = new Date();
+                    log.info("Job for table \"" + table + "\" finished: " + 
endTime);
+                    log.info("The job took " + (endTime.getTime() - 
beginTime.getTime()) / 1000 + " seconds.");
+                } else {
+                    log.error("Job for table \"" + table + "\" Failed!!!");
+                    return exitCode;
+                }
+            } else {
+                log.warn("The table \"" + table + "\" was NOT found in the 
parent instance and cannot be copied.");
+            }
+        }
+
+        return 0;
+    }
+
+    private int runImport() throws Exception {
+        log.info("Setting up Copy Tool for importing...");
+
+        setup();
+
+        createChildInstance(conf);
+
+        for (String childTable : tables) {
+            String jobName = "Copy Tool, importing Exported Parent Table files 
from: " + getPath(localCopyFileImportDir, childTable).toString() + ", into 
Child Table: " + childTable + ", " + System.currentTimeMillis();
+            log.info("Initializing job: " + jobName);
+            conf.set(MRUtils.JOB_NAME_PROP, jobName);
+
+            // Submit the job
+            Date beginTime = new Date();
+            log.info("Job for table \"" + childTable + "\" started: " + 
beginTime);
+
+            createTableIfNeeded(childTable);
+            importFilesToChildTable(childTable);
+
+            Date endTime = new Date();
+            log.info("Job for table \"" + childTable + "\" finished: " + 
endTime);
+            log.info("The job took " + (endTime.getTime() - 
beginTime.getTime()) / 1000 + " seconds.");
+        }
+
+        return 0;
+    }
+
+    private int runQueryCopy() throws Exception {
+        log.info("Setting up Copy Tool with a query-based ruleset...");
+        setup();
+        if (!useCopyFileOutput) {
+            createChildInstance(conf);
+        }
+
+        // Set up the configuration
+        AccumuloRdfConfiguration aconf = new AccumuloRdfConfiguration(conf);
+        aconf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+        aconf.setTablePrefix(tablePrefix);
+        aconf.setFlush(false);
+        ConfigUtils.setIndexers(aconf);
+
+        // Since we're copying at the statement-level, ignore any given list 
of tables and determine
+        // which tables we might need to create based on which indexers are 
desired.
+        TablePrefixLayoutStrategy prefixStrategy = new 
TablePrefixLayoutStrategy(tablePrefix);
+        tables.clear();
+        // Always include core tables
+        tables.add(prefixStrategy.getSpo());
+        tables.add(prefixStrategy.getOsp());
+        tables.add(prefixStrategy.getPo());
+        // Copy namespaces if they exist
+        tables.add(prefixStrategy.getNs());
+        // Add tables associated with any configured indexers
+        if (aconf.getBoolean(ConfigUtils.USE_FREETEXT, false)) {
+            tables.add(ConfigUtils.getFreeTextDocTablename(conf));
+            tables.add(ConfigUtils.getFreeTextTermTablename(conf));
+        }
+        if (aconf.getBoolean(ConfigUtils.USE_GEO, false)) {
+            tables.add(ConfigUtils.getGeoTablename(conf));
+        }
+        if (aconf.getBoolean(ConfigUtils.USE_TEMPORAL, false)) {
+            tables.add(ConfigUtils.getTemporalTableName(conf));
+        }
+        if (aconf.getBoolean(ConfigUtils.USE_ENTITY, false)) {
+            tables.add(ConfigUtils.getEntityTableName(conf));
+        }
+        // Ignore anything else, e.g. statistics -- must be recalculated for 
the child if desired
+
+        // Extract the ruleset, and copy the namespace table directly
+        AccumuloQueryRuleset ruleset = new AccumuloQueryRuleset(aconf);
+        ruleset.addTable(prefixStrategy.getNs());
+        for (String line : ruleset.toString().split("\n")) {
+            log.info(line);
+        }
+
+        // Create a Job and configure its input and output
+        Job job = Job.getInstance(aconf);
+        job.setJarByClass(this.getClass());
+        setupMultiTableInputFormat(job, ruleset);
+        setupOutputFormat(job, "");
+
+        if (useCopyFileOutput) {
+            // Configure job for file output
+            job.setJobName("Ruleset-based export to file: " + tablePrefix + " 
-> " + localBaseOutputDir);
+            // Map (row) to (table+key, key+value)
+            job.setMapperClass(RowRuleMapper.class);
+            job.setMapOutputKeyClass(GroupedRow.class);
+            job.setMapOutputValueClass(GroupedRow.class);
+            // Group according to table and and sort according to key
+            job.setGroupingComparatorClass(GroupedRow.GroupComparator.class);
+            job.setSortComparatorClass(GroupedRow.SortComparator.class);
+            // Reduce ([table+row], rows): output each row to the file for 
that table, in sorted order
+            job.setReducerClass(MultipleFileReducer.class);
+            job.setOutputKeyClass(Key.class);
+            job.setOutputValueClass(Value.class);
+        }
+        else {
+            // Configure job for table output
+            job.setJobName("Ruleset-based copy: " + tablePrefix + " -> " + 
childTablePrefix);
+            // Map (row): convert to statement, insert to child (for namespace 
table, output row directly)
+            job.setMapperClass(AccumuloRyaRuleMapper.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Mutation.class);
+            job.setNumReduceTasks(0);
+            // Create the child tables, so mappers don't try to do this in 
parallel
+            for (String parentTable : tables) {
+                String childTable = parentTable.replaceFirst(tablePrefix, 
childTablePrefix);
+                createTableIfNeeded(childTable);
+            }
+        }
+
+        // Run the job and copy files to local filesystem if needed
+        Date beginTime = new Date();
+        log.info("Job started: " + beginTime);
+        boolean success = job.waitForCompletion(true);
+        if (success) {
+            if (useCopyFileOutput) {
+                log.info("Moving data from HDFS to the local file system");
+                Path baseOutputPath = new Path(baseOutputDir);
+                for (FileStatus status : 
FileSystem.get(conf).listStatus(baseOutputPath)) {
+                    if (status.isDirectory()) {
+                        String tableName = status.getPath().getName();
+                        Path hdfsPath = getPath(baseOutputDir, tableName);
+                        Path localPath = getPath(localBaseOutputDir, 
tableName);
+                        log.info("HDFS directory: " + hdfsPath.toString());
+                        log.info("Local directory: " + localPath.toString());
+                        copyHdfsToLocal(hdfsPath, localPath);
+                    }
+                }
+            }
+            Date endTime = new Date();
+            log.info("Job finished: " + endTime);
+            log.info("The job took " + (endTime.getTime() - 
beginTime.getTime()) / 1000 + " seconds.");
+            return 0;
+        } else {
+            log.error("Job failed!!!");
+            return 1;
+        }
+    }
+
+    /**
+     * Creates the child table if it doesn't already exist.
+     * @param childTableName the name of the child table.
+     * @throws IOException
+     */
+    public void createTableIfNeeded(String childTableName) throws IOException {
+        try {
+            Configuration childConfig = MergeToolMapper.getChildConfig(conf);
+            AccumuloRdfConfiguration childAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(childConfig);
+            childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix);
+            Connector childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
+            if (!childConnector.tableOperations().exists(childTableName)) {
+                log.info("Creating table: " + childTableName);
+                childConnector.tableOperations().create(childTableName);
+                log.info("Created table: " + childTableName);
+                log.info("Granting authorizations to table: " + 
childTableName);
+                
childConnector.securityOperations().grantTablePermission(childUserName, 
childTableName, TablePermission.WRITE);
+                log.info("Granted authorizations to table: " + childTableName);
+            }
+        } catch (TableExistsException | AccumuloException | 
AccumuloSecurityException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private void setupSplitsFile(Job job, TableOperations 
parentTableOperations, String parentTableName, String childTableName) throws 
Exception {
+        FileSystem fs = FileSystem.get(conf);
+        fs.setPermission(getPath(baseOutputDir, childTableName), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+        Path splitsPath = getPath(baseOutputDir, childTableName, "splits.txt");
+        Collection<Text> splits = 
parentTableOperations.listSplits(parentTableName, 100);
+        log.info("Creating splits file at: " + splitsPath);
+        try (PrintStream out = new PrintStream(new 
BufferedOutputStream(fs.create(splitsPath)))) {
+            for (Text split : splits) {
+                String encoded = new 
String(Base64.encodeBase64(TextUtil.getBytes(split)));
+                out.println(encoded);
+            }
+        }
+        fs.setPermission(splitsPath, new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL));
+
+        String userDir = System.getProperty("user.dir");
+        // The splits file has a symlink created in the user directory for 
some reason.
+        // It might be better to copy the entire file for Windows but it 
doesn't seem to matter if
+        // the user directory symlink is broken.
+        java.nio.file.Files.deleteIfExists(new File(userDir, 
"splits.txt").toPath());
+        //Files.copy(new File(splitsPath.toString()), new File(userDir, 
"splits.txt"));
+        job.setPartitionerClass(KeyRangePartitioner.class);
+        KeyRangePartitioner.setSplitFile(job, splitsPath.toString());
+        job.setNumReduceTasks(splits.size() + 1);
+    }
+
+    /**
+     * Converts a path string, or a sequence of strings that when joined form 
a path string,
+     * to a {@link org.apache.hadoop.fs.Path}.
+     * @param first The path string or initial part of the path string.
+     * @param more Additional strings to be joined to form the path string.
+     * @return the resulting {@link org.apache.hadoop.fs.Path}.
+     */
+    public static Path getPath(String first, String... more) {
+        java.nio.file.Path path = Paths.get(first, more);
+        String stringPath = 
FilenameUtils.separatorsToUnix(path.toAbsolutePath().toString());
+        Path hadoopPath = new Path(stringPath);
+        return hadoopPath;
+    }
+
+    /**
+     * Imports the files that hold the table data into the child instance.
+     * @param childTableName the name of the child table to import.
+     * @throws Exception
+     */
+    public void importFilesToChildTable(String childTableName) throws 
Exception {
+        Configuration childConfig = MergeToolMapper.getChildConfig(conf);
+        AccumuloRdfConfiguration childAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(childConfig);
+        childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix);
+        Connector childConnector = 
AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration);
+        TableOperations childTableOperations = 
childConnector.tableOperations();
+
+        Path localWorkDir = getPath(localCopyFileImportDir, childTableName);
+        Path hdfsBaseWorkDir = getPath(baseImportDir, childTableName);
+
+        copyLocalToHdfs(localWorkDir, hdfsBaseWorkDir);
+
+        Path files = getPath(hdfsBaseWorkDir.toString(), "files");
+        Path failures = getPath(hdfsBaseWorkDir.toString(), "failures");
+        FileSystem fs = FileSystem.get(conf);
+        // With HDFS permissions on, we need to make sure the Accumulo user 
can read/move the files
+        fs.setPermission(hdfsBaseWorkDir, new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL));
+        if (fs.exists(failures)) {
+            fs.delete(failures, true);
+        }
+        fs.mkdirs(failures);
+
+        childTableOperations.importDirectory(childTableName, files.toString(), 
failures.toString(), false);
+    }
+
+    /**
+     * Copies the file from the local file system into the HDFS.
+     * @param localInputPath the local system input {@link Path}.
+     * @param hdfsOutputPath the HDFS output {@link Path}.
+     * @throws IOException
+     */
+    public void copyLocalToHdfs(Path localInputPath, Path hdfsOutputPath) 
throws IOException {
+        copyLocalToHdfs(localInputPath, hdfsOutputPath, conf);
+    }
+
+    /**
+     * Copies the file from the local file system into the HDFS.
+     * @param localInputPath the local system input {@link Path}.
+     * @param hdfsOutputPath the HDFS output {@link Path}.
+     * @param configuration the {@link Configuration} to use.
+     * @throws IOException
+     */
+    public static void copyLocalToHdfs(Path localInputPath, Path 
hdfsOutputPath, Configuration configuration) throws IOException {
+        FileSystem fs = FileSystem.get(configuration);
+        fs.copyFromLocalFile(localInputPath, hdfsOutputPath);
+    }
+
+    /**
+     * Copies the file from HDFS into the local file system.
+     * @param hdfsInputPath the HDFS input {@link Path}.
+     * @param localOutputPath the local system output {@link Path}.
+     * @throws IOException
+     */
+    public void copyHdfsToLocal(Path hdfsInputPath, Path localOutputPath) 
throws IOException {
+        copyHdfsToLocal(hdfsInputPath, localOutputPath, conf);
+    }
+
+    /**
+     * Copies the file from HDFS into the local file system.
+     * @param hdfsInputPath the HDFS input {@link Path}.
+     * @param localOutputPath the local system output {@link Path}.
+     * @param configuration the {@link Configuration} to use.
+     * @throws IOException
+     */
+    public static void copyHdfsToLocal(Path hdfsInputPath, Path 
localOutputPath, Configuration configuration) throws IOException {
+        FileSystem fs = FileSystem.get(configuration);
+        fs.copyToLocalFile(hdfsInputPath, localOutputPath);
+    }
+
+    @Override
+    protected void setupInputFormat(Job job) throws AccumuloSecurityException {
+        if (useCopyFileImport) {
+            try {
+                AccumuloHDFSFileInputFormat.setInputPaths(job, 
localCopyFileImportDir);
+            } catch (IOException e) {
+                log.error("Failed to set copy file import directory", e);
+            }
+        } else {
+            // set up accumulo input
+            if (!hdfsInput) {
+                job.setInputFormatClass(AccumuloInputFormat.class);
+            } else {
+                job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
+            }
+            AccumuloInputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+            AccumuloInputFormat.setInputTableName(job, 
RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix));
+            AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+            if (!mock) {
+                AccumuloInputFormat.setZooKeeperInstance(job, new 
ClientConfiguration().withInstance(instance).withZkHosts(zk));
+            } else {
+                AccumuloInputFormat.setMockInstance(job, instance);
+            }
+            if (ttl != null) {
+                IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class);
+                AgeOffFilter.setTTL(setting, Long.valueOf(ttl));
+                AccumuloInputFormat.addIterator(job, setting);
+            }
+            if (startTime != null) {
+                IteratorSetting setting = getStartTimeSetting(startTime);
+                AccumuloInputFormat.addIterator(job, setting);
+            }
+            for (IteratorSetting iteratorSetting : 
AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) {
+                AccumuloInputFormat.addIterator(job, iteratorSetting);
+            }
+        }
+    }
+
+    /**
+     * Set up job to use AccumuloMultiTableInput format, using the 
tables/ranges given by a ruleset.
+     * @param job The Job to configure
+     * @param rules The ruleset mapping a query to the appropriate tables and 
ranges
+     */
+    protected void setupMultiTableInputFormat(Job job, AccumuloQueryRuleset 
rules) throws AccumuloSecurityException {
+        AbstractInputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+        AbstractInputFormat.setScanAuthorizations(job, authorizations);
+        if (!mock) {
+            AbstractInputFormat.setZooKeeperInstance(job, new 
ClientConfiguration().withInstance(instance).withZkHosts(zk));
+        } else {
+            AbstractInputFormat.setMockInstance(job, instance);
+        }
+        Map<String, InputTableConfig> configs = rules.getInputConfigs();
+        // Add any relevant iterator settings
+        List<IteratorSetting> additionalSettings = new 
LinkedList<>(AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS);
+        if (ttl != null) {
+            IteratorSetting ttlSetting = new IteratorSetting(1, "fi", 
AgeOffFilter.class);
+            AgeOffFilter.setTTL(ttlSetting, Long.valueOf(ttl));
+            additionalSettings.add(ttlSetting);
+        }
+        if (startTime != null) {
+            IteratorSetting startTimeSetting = getStartTimeSetting(startTime);
+            additionalSettings.add(startTimeSetting);
+        }
+        for (Map.Entry<String, InputTableConfig> entry : configs.entrySet()) {
+            List<IteratorSetting> iterators = entry.getValue().getIterators();
+            iterators.addAll(additionalSettings);
+            entry.getValue().setIterators(iterators);
+        }
+        // Set the input format
+        AccumuloMultiTableInputFormat.setInputTableConfigs(job, configs);
+        job.setInputFormatClass(AccumuloMultiTableInputFormat.class);
+    }
+
+    @Override
+    protected void setupOutputFormat(Job job, String outputTable) throws 
AccumuloSecurityException {
+        AccumuloOutputFormat.setConnectorInfo(job, childUserName, new 
PasswordToken(childPwd));
+        AccumuloOutputFormat.setCreateTables(job, true);
+        AccumuloOutputFormat.setDefaultTableName(job, outputTable);
+        if (!childMock) {
+            AccumuloOutputFormat.setZooKeeperInstance(job, new 
ClientConfiguration().withInstance(childInstance).withZkHosts(childZk));
+        } else {
+            AccumuloOutputFormat.setMockInstance(job, childInstance);
+        }
+        if (useCopyFileOutput) {
+            log.info("Using file output format mode.");
+            if (StringUtils.isNotBlank(baseOutputDir)) {
+                Path baseOutputPath;
+                Path filesOutputPath;
+                if (StringUtils.isNotBlank(outputTable)) {
+                    filesOutputPath = getPath(baseOutputDir, outputTable, 
"files");
+                    baseOutputPath = filesOutputPath.getParent();
+                    job.setOutputFormatClass(AccumuloFileOutputFormat.class);
+                }
+                else {
+                    // If table name is not given, configure output for one 
level higher:
+                    // it's up to the job to handle subdirectories. Make sure 
the parent
+                    // exists.
+                    filesOutputPath = getPath(baseOutputDir);
+                    baseOutputPath = filesOutputPath;
+                    LazyOutputFormat.setOutputFormatClass(job, 
AccumuloFileOutputFormat.class);
+                    MultipleOutputs.setCountersEnabled(job, true);
+                }
+                log.info("File output destination: " + filesOutputPath);
+                if (useCopyFileOutputDirectoryClear) {
+                    try {
+                        clearOutputDir(baseOutputPath);
+                    } catch (IOException e) {
+                        log.error("Error clearing out output path.", e);
+                    }
+                }
+                try {
+                    FileSystem fs = FileSystem.get(conf);
+                    fs.mkdirs(filesOutputPath.getParent());
+                    fs.setPermission(filesOutputPath.getParent(), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+                } catch (IOException e) {
+                    log.error("Failed to set permission for output path.", e);
+                }
+                AccumuloFileOutputFormat.setOutputPath(job, filesOutputPath);
+
+                if (StringUtils.isNotBlank(compressionType)) {
+                    if (isValidCompressionType(compressionType)) {
+                        log.info("File compression type: " + compressionType);
+                        AccumuloFileOutputFormat.setCompressionType(job, 
compressionType);
+                    } else {
+                        log.warn("Invalid compression type: " + 
compressionType);
+                    }
+                }
+            }
+        } else {
+            log.info("Using accumulo output format mode.");
+            job.setOutputFormatClass(AccumuloOutputFormat.class);
+        }
+    }
+
+    /**
+     * Sets up and runs the copy tool with the provided args.
+     * @param args the arguments list.
+     * @return the execution result.
+     */
+    public int setupAndRun(String[] args) {
+        int returnCode = -1;
+        try {
+            Configuration conf = new Configuration();
+            Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, 
args);
+            if (!toolArgs.isEmpty()) {
+                String parameters = Joiner.on("\r\n\t").join(toolArgs);
+                log.info("Running Copy Tool with the following 
parameters...\r\n\t" + parameters);
+            }
+
+            returnCode = ToolRunner.run(conf, this, args);
+        } catch (Exception e) {
+            log.error("Error running copy tool", e);
+        }
+        return returnCode;
+    }
+
+    public static void main(String[] args) {
+        String log4jConfiguration = 
System.getProperties().getProperty("log4j.configuration");
+        if (StringUtils.isNotBlank(log4jConfiguration)) {
+            String parsedConfiguration = 
StringUtils.removeStart(log4jConfiguration, "file:");
+            File configFile = new File(parsedConfiguration);
+            if (configFile.exists()) {
+                DOMConfigurator.configure(parsedConfiguration);
+            } else {
+                BasicConfigurator.configure();
+            }
+        }
+        log.info("Starting Copy Tool");
+
+        Thread.setDefaultUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread thread, Throwable throwable) {
+                log.error("Uncaught exception in " + thread.getName(), 
throwable);
+            }
+        });
+
+        CopyTool copyTool = new CopyTool();
+        int returnCode = copyTool.setupAndRun(args);
+
+        log.info("Finished running Copy Tool");
+
+        System.exit(returnCode);
+    }
+
+    /**
+     * Creates an {@link IteratorSetting} with a time stamp filter that starts 
with the specified data.
+     * @param startTimeString the start time of the filter.
+     * @return the {@link IteratorSetting}.
+     */
+    public static IteratorSetting getStartTimeSetting(String startTimeString) {
+        Date date = null;
+        try {
+            date = MergeTool.START_TIME_FORMATTER.parse(startTimeString);
+        } catch (ParseException e) {
+            throw new IllegalArgumentException("Couldn't parse " + 
startTimeString, e);
+        }
+        return getStartTimeSetting(date);
+    }
+
+    /**
+     * Creates an {@link IteratorSetting} with a time stamp filter that starts 
with the specified data.
+     * @param date the start {@link Date} of the filter.
+     * @return the {@link IteratorSetting}.
+     */
+    public static IteratorSetting getStartTimeSetting(Date date) {
+        return getStartTimeSetting(date.getTime());
+    }
+
+    /**
+     * Creates an {@link IteratorSetting} with a time stamp filter that starts 
with the specified data.
+     * @param time the start time of the filter.
+     * @return the {@link IteratorSetting}.
+     */
+    public static IteratorSetting getStartTimeSetting(long time) {
+        IteratorSetting setting = new IteratorSetting(1, "startTimeIterator", 
TimestampFilter.class);
+        TimestampFilter.setStart(setting, time, true);
+        TimestampFilter.setEnd(setting, Long.MAX_VALUE, true);
+        return setting;
+    }
+
+    /**
+     * Checks to see if the specified compression type is valid. The 
compression must be defined in
+     * {@link Algorithm} to be valid.
+     * @param compressionType the compression type to check.
+     * @return {@code true} if the compression type is one of "none", "gz", 
"lzo", or "snappy".
+     * {@code false} otherwise.
+     */
+    private static boolean isValidCompressionType(String compressionType) {
+        for (Algorithm algorithm : Algorithm.values()) {
+            if (algorithm.getName().equals(compressionType)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void clearOutputDir(Path path) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+        fs.delete(path, true);
+    }
+
+    private Instance createChildInstance(Configuration config) throws 
Exception {
+        Instance instance = null;
+        String instanceTypeProp = config.get(CREATE_CHILD_INSTANCE_TYPE_PROP);
+        String childAuth = config.get(MRUtils.AC_AUTH_PROP + 
MergeTool.CHILD_SUFFIX);
+
+        // Default to distribution cluster if not specified
+        if (StringUtils.isBlank(instanceTypeProp)) {
+            instanceTypeProp = InstanceType.DISTRIBUTION.toString();
+        }
+
+        InstanceType instanceType = InstanceType.fromName(instanceTypeProp);
+        switch (instanceType) {
+            case DISTRIBUTION:
+                if (childInstance == null) {
+                    throw new IllegalArgumentException("Must specify instance 
name for distributed mode");
+                } else if (childZk == null) {
+                    throw new IllegalArgumentException("Must specify ZooKeeper 
hosts for distributed mode");
+                }
+                instance = new ZooKeeperInstance(childInstance, childZk);
+                break;
+
+            case MINI:
+                childAccumuloInstanceDriver = new 
AccumuloInstanceDriver("Child", false, true, false, false, childUserName, 
childPwd, childInstance, childTablePrefix, childAuth);
+                childAccumuloInstanceDriver.setUpInstance();
+                childAccumuloInstanceDriver.setUpTables();
+                childZk = childAccumuloInstanceDriver.getZooKeepers();
+                MergeTool.setDuplicateKeysForProperty(config, 
MRUtils.AC_ZK_PROP+ MergeTool.CHILD_SUFFIX, childZk);
+                instance = new ZooKeeperInstance(childInstance, childZk);
+                break;
+
+            case MOCK:
+                instance = new MockInstance(childInstance);
+                break;
+
+            default:
+                throw new AccumuloException("Unexpected instance type: " + 
instanceType);
+        }
+
+        return instance;
+    }
+
+    /**
+     * @return the child {@link AccumuloInstanceDriver} or {@code null}.
+     */
+    public AccumuloInstanceDriver getChildAccumuloInstanceDriver() {
+        return childAccumuloInstanceDriver;
+    }
+
+    /**
+     * Shuts down the child {@link AccumuloInstanceDriver} in the {@link 
CopyTool} if it exists.
+     * @throws Exception
+     */
+    public void shutdown() throws Exception {
+        if (childAccumuloInstanceDriver != null) {
+            childAccumuloInstanceDriver.tearDown();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/MergeTool.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/MergeTool.java 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/MergeTool.java
new file mode 100644
index 0000000..89fb787
--- /dev/null
+++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/MergeTool.java
@@ -0,0 +1,554 @@
+package mvm.rya.accumulo.mr.merge;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.admin.TableOperations;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
+import org.apache.log4j.xml.DOMConfigurator;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.mr.merge.gui.DateTimePickerDialog;
+import mvm.rya.accumulo.mr.merge.mappers.MergeToolMapper;
+import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
+import mvm.rya.accumulo.mr.merge.util.TimeUtils;
+import mvm.rya.accumulo.mr.merge.util.ToolConfigUtils;
+import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+/**
+ * Handles merging a child accumulo instance's data back into its parent's
+ * instance.
+ */
+public class MergeTool extends AbstractDualInstanceAccumuloMRTool {
+    private static final Logger log = Logger.getLogger(MergeTool.class);
+
+    public static final SimpleDateFormat START_TIME_FORMATTER = new 
SimpleDateFormat("yyyyMMddHHmmssSSSz");
+
+    /**
+     * Appended to certain config property names to indicate that the property 
is for the child instance.
+     */
+    public static final String CHILD_SUFFIX = ".child";
+
+    /**
+     * Suffix added to a child table when it is temporarily being imported 
into the parent instance when
+     * being read from file and before the tables are merged together.
+     */
+    public static final String TEMP_SUFFIX = "_temp_child";
+
+    /**
+     * The time of the data to be included in the copy/merge process.
+     */
+    public static final String START_TIME_PROP = "tool.start.time";
+
+    /**
+     * The name of the table to process for the map reduce job.
+     */
+    public static final String TABLE_NAME_PROP = "tool.table.name";
+
+    /**
+     * "true" to use file input. "false" to use Accumulo output.
+     */
+    public static final String USE_MERGE_FILE_INPUT = "use.merge.file.input";
+
+    /**
+     * The file path to the child data input to merge in.
+     */
+    public static final String MERGE_FILE_INPUT_PATH = "merge.file.input.path";
+
+    /**
+     * A value used for the {@link #START_TIME_PROP} property to indicate that 
a dialog
+     * should be displayed to select the time.
+     */
+    public static final String USE_START_TIME_DIALOG = "dialog";
+
+    private static final String DIALOG_TITLE = "Select a Start Time/Date";
+    private static final String DIALOG_MESSAGE =
+        "<html>Choose the time of the data to merge.<br>Only data modified 
AFTER the selected time will be merged.</html>";
+
+    private String startTime = null;
+    private String tempDir = null;
+    private boolean useMergeFileInput = false;
+    private String localMergeFileImportDir = null;
+    private String baseImportDir = null;
+
+    private String tempChildAuths = null;
+
+    private final List<String> tables = new ArrayList<>();
+
+    /**
+     * Map of keys that are supposed to use the same values.
+     */
+    public static final ImmutableMap<String, List<String>> DUPLICATE_KEY_MAP = 
ImmutableMap.<String, List<String>>builder()
+        .put(MRUtils.AC_MOCK_PROP, 
ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE))
+        .put(MRUtils.AC_INSTANCE_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE))
+        .put(MRUtils.AC_USERNAME_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_USER))
+        .put(MRUtils.AC_PWD_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD))
+        .put(MRUtils.AC_AUTH_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS, 
RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH))
+        .put(MRUtils.AC_ZK_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS))
+        .put(MRUtils.TABLE_PREFIX_PROPERTY, 
ImmutableList.of(ConfigUtils.CLOUDBASE_TBL_PREFIX, 
RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX))
+        .put(MRUtils.AC_MOCK_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE + CHILD_SUFFIX))
+        .put(MRUtils.AC_INSTANCE_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE + CHILD_SUFFIX))
+        .put(MRUtils.AC_USERNAME_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_USER + CHILD_SUFFIX))
+        .put(MRUtils.AC_PWD_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD + CHILD_SUFFIX))
+        .put(MRUtils.AC_AUTH_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS + CHILD_SUFFIX, 
RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH + CHILD_SUFFIX))
+        .put(MRUtils.AC_ZK_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS + CHILD_SUFFIX))
+        .put(MRUtils.TABLE_PREFIX_PROPERTY + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_TBL_PREFIX + CHILD_SUFFIX, 
RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + CHILD_SUFFIX))
+        .build();
+
+
+    /**
+     * Sets duplicate keys in the config.
+     * @param config the {@link Configuration}.
+     */
+    public static void setDuplicateKeys(Configuration config) {
+        for (Entry<String, List<String>> entry : DUPLICATE_KEY_MAP.entrySet()) 
{
+            String key = entry.getKey();
+            List<String> duplicateKeys = entry.getValue();
+            String value = config.get(key);
+            if (value != null) {
+                for (String duplicateKey : duplicateKeys) {
+                    config.set(duplicateKey, value);
+                }
+            }
+        }
+    }
+
+    /**
+     * Sets all duplicate keys for the property in the config to the specified 
value.
+     * @param config the {@link Configuration}.
+     * @param property the property to set and all its duplicates.
+     * @param value the value to set the property to.
+     */
+    public static void setDuplicateKeysForProperty(Configuration config, 
String property, String value) {
+        List<String> duplicateKeys = DUPLICATE_KEY_MAP.get(property);
+        config.set(property, value);
+        if (duplicateKeys != null) {
+            for (String key : duplicateKeys) {
+                config.set(key, value);
+            }
+        }
+    }
+
+    /**
+     * Sets up and initializes the merge tool's configuration.
+     * @throws Exception
+     */
+    public void setup() throws Exception {
+        super.init();
+
+        tempDir = conf.get("hadoop.tmp.dir", null);
+        if (tempDir == null) {
+            throw new Exception("Invalid hadoop temp directory. 
\"hadoop.tmp.dir\" could not be found in the configuration.");
+        }
+
+        useMergeFileInput = conf.getBoolean(USE_MERGE_FILE_INPUT, false);
+        localMergeFileImportDir = conf.get(MERGE_FILE_INPUT_PATH, null);
+        baseImportDir = tempDir + "/merge_tool_file_input/";
+
+        startTime = conf.get(START_TIME_PROP, null);
+
+        if (!useMergeFileInput) {
+            // Display start time dialog if requested
+            if (USE_START_TIME_DIALOG.equals(startTime)) {
+                log.info("Select start time from dialog...");
+
+                DateTimePickerDialog dateTimePickerDialog = new 
DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE);
+                dateTimePickerDialog.setVisible(true);
+
+                Date date = dateTimePickerDialog.getSelectedDateTime();
+                startTime = START_TIME_FORMATTER.format(date);
+                conf.set(START_TIME_PROP, startTime);
+                log.info("Will merge all data after " + date);
+            } else if (startTime != null) {
+                try {
+                    Date date = START_TIME_FORMATTER.parse(startTime);
+                    log.info("Will merge all data after " + date);
+                } catch (ParseException e) {
+                    throw new Exception("Unable to parse the provided start 
time: " + startTime, e);
+                }
+            }
+
+            boolean useTimeSync = 
conf.getBoolean(CopyTool.USE_NTP_SERVER_PROP, false);
+            if (useTimeSync) {
+                String tomcatUrl = conf.get(CopyTool.CHILD_TOMCAT_URL_PROP, 
null);
+                String ntpServerHost = conf.get(CopyTool.NTP_SERVER_HOST_PROP, 
null);
+                Long timeOffset = null;
+                try {
+                    log.info("Comparing child machine's time to NTP server 
time...");
+                    timeOffset = 
TimeUtils.getNtpServerAndMachineTimeDifference(ntpServerHost, tomcatUrl);
+                } catch (IOException | ParseException e) {
+                    throw new Exception("Unable to get time difference between 
machine and NTP server.", e);
+                }
+                if (timeOffset != null) {
+                    conf.set(CopyTool.CHILD_TIME_OFFSET_PROP, "" + timeOffset);
+                }
+            }
+        }
+
+        setDuplicateKeys(conf);
+
+        tables.add(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
+    }
+
+    @Override
+    public int run(String[] strings) throws Exception {
+        useMergeFileInput = conf.getBoolean(USE_MERGE_FILE_INPUT, false);
+
+        log.info("Setting up Merge Tool...");
+        setup();
+
+        if (useMergeFileInput) {
+            // When using file input mode the child instance will use a 
temporary table in the parent instance to
+            // store the child table data.  The two tables will then be merged 
together.
+            copyParentPropertiesToChild(conf);
+        }
+
+        for (String table : tables) {
+            String childTable = table.replaceFirst(tablePrefix, 
childTablePrefix);
+            String jobName = "Merge Tool, merging Child Table: " + childTable 
+ ", into Parent Table: " + table + ", " + System.currentTimeMillis();
+            log.info("Initializing job: " + jobName);
+            conf.set(MRUtils.JOB_NAME_PROP, jobName);
+            conf.set(TABLE_NAME_PROP, table);
+
+            Job job = Job.getInstance(conf);
+            job.setJarByClass(MergeTool.class);
+
+            if (useMergeFileInput) {
+                importChildFilesToTempParentTable(childTable);
+            }
+
+            setupInputFormat(job);
+
+            AccumuloInputFormat.setInputTableName(job, table);
+
+            // Set input output of the particular job
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(Mutation.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Mutation.class);
+
+            setupOutputFormat(job, table);
+
+            // Set mapper and reducer classes
+            job.setMapperClass(MergeToolMapper.class);
+            job.setReducerClass(Reducer.class);
+
+            // Submit the job
+            Date beginTime = new Date();
+            log.info("Job for table \"" + table + "\" started: " + beginTime);
+            int exitCode = job.waitForCompletion(true) ? 0 : 1;
+
+            if (useMergeFileInput && StringUtils.isNotBlank(tempChildAuths)) {
+                // Clear any of the temporary child auths given to the parent
+                AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(conf);
+                parentAccumuloRdfConfiguration.setTablePrefix(tablePrefix);
+                Connector parentConnector = 
AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration);
+                SecurityOperations secOps = 
parentConnector.securityOperations();
+
+                AccumuloRyaUtils.removeUserAuths(userName, secOps, 
tempChildAuths);
+            }
+
+            if (exitCode == 0) {
+                Date endTime = new Date();
+                log.info("Job for table \"" + table + "\" finished: " + 
endTime);
+                log.info("The job took " + (endTime.getTime() - 
beginTime.getTime()) / 1000 + " seconds.");
+            } else {
+                log.error("Job for table \"" + table + "\" Failed!!!");
+                return exitCode;
+            }
+        }
+
+        return 0;
+    }
+
+    /**
+     * Creates the temp child table if it doesn't already exist in the parent.
+     * @param childTableName the name of the child table.
+     * @throws IOException
+     */
+    public void createTempTableIfNeeded(String childTableName) throws 
IOException {
+        try {
+            AccumuloRdfConfiguration accumuloRdfConfiguration = new 
AccumuloRdfConfiguration(conf);
+            accumuloRdfConfiguration.setTablePrefix(childTablePrefix);
+            Connector connector = 
AccumuloRyaUtils.setupConnector(accumuloRdfConfiguration);
+            if (!connector.tableOperations().exists(childTableName)) {
+                log.info("Creating table: " + childTableName);
+                connector.tableOperations().create(childTableName);
+                log.info("Created table: " + childTableName);
+                log.info("Granting authorizations to table: " + 
childTableName);
+                SecurityOperations secOps = connector.securityOperations();
+                secOps.grantTablePermission(userName, childTableName, 
TablePermission.WRITE);
+                log.info("Granted authorizations to table: " + childTableName);
+
+                Authorizations parentAuths = 
secOps.getUserAuthorizations(userName);
+                // Add child authorizations so the temp parent table can be 
accessed.
+                if (!parentAuths.equals(childAuthorizations)) {
+                    List<String> childAuthList = 
findUniqueAuthsFromChild(parentAuths.toString(), 
childAuthorizations.toString());
+                    tempChildAuths = Joiner.on(",").join(childAuthList);
+                    log.info("Adding the authorization, \"" + tempChildAuths + 
"\", to the parent user, \"" + userName + "\"");
+                    Authorizations newAuths = 
AccumuloRyaUtils.addUserAuths(userName, secOps, new 
Authorizations(tempChildAuths));
+                    secOps.changeUserAuthorizations(userName, newAuths);
+                }
+            }
+        } catch (TableExistsException | AccumuloException | 
AccumuloSecurityException e) {
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Gets any unique user auths that the child has that the parent does not.
+     * @param parentAuths the comma-separated string of parent authorizations.
+     * @param childAuths the comma-separated string of parent authorizations.
+     * @return the unique child authorizations that are not in the parent.
+     */
+    private static List<String> findUniqueAuthsFromChild(String parentAuths, 
String childAuths) {
+        List<String> parentAuthList = 
AccumuloRyaUtils.convertAuthStringToList(parentAuths);
+        List<String> childAuthList = 
AccumuloRyaUtils.convertAuthStringToList(childAuths);
+
+        childAuthList.removeAll(parentAuthList);
+
+        return childAuthList;
+    }
+
+    /**
+     * Imports the child files that hold the table data into the parent 
instance as a temporary table.
+     * @param childTableName the name of the child table to import into a 
temporary parent table.
+     * @throws Exception
+     */
+    public void importChildFilesToTempParentTable(String childTableName) 
throws Exception {
+        // Create a temporary table in the parent instance to import the child 
files to.  Then run the merge process on the parent table and temp child table.
+        String tempChildTable = childTableName + TEMP_SUFFIX;
+
+        createTempTableIfNeeded(tempChildTable);
+
+        AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new 
AccumuloRdfConfiguration(conf);
+        parentAccumuloRdfConfiguration.setTablePrefix(childTablePrefix);
+        Connector parentConnector = 
AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration);
+        TableOperations parentTableOperations = 
parentConnector.tableOperations();
+
+        Path localWorkDir = CopyTool.getPath(localMergeFileImportDir, 
childTableName);
+        Path hdfsBaseWorkDir = CopyTool.getPath(baseImportDir, childTableName);
+
+        CopyTool.copyLocalToHdfs(localWorkDir, hdfsBaseWorkDir, conf);
+
+        Path files = CopyTool.getPath(hdfsBaseWorkDir.toString(), "files");
+        Path failures = CopyTool.getPath(hdfsBaseWorkDir.toString(), 
"failures");
+        FileSystem fs = FileSystem.get(conf);
+        // With HDFS permissions on, we need to make sure the Accumulo user 
can read/move the files
+        fs.setPermission(hdfsBaseWorkDir, new FsPermission(FsAction.ALL, 
FsAction.ALL, FsAction.ALL));
+        if (fs.exists(failures)) {
+            fs.delete(failures, true);
+        }
+        fs.mkdirs(failures);
+
+        parentTableOperations.importDirectory(tempChildTable, 
files.toString(), failures.toString(), false);
+
+        AccumuloRyaUtils.printTablePretty(tempChildTable, conf);
+    }
+
+    /**
+     * Copies all the relevant parent instance config properties to the 
corresponding child properties.
+     * @param config the {@link Configuration} to use.
+     */
+    public static void copyParentPropertiesToChild(Configuration config) {
+        // Copy the parent properties for the child to use.
+        copyParentPropToChild(config, MRUtils.AC_MOCK_PROP);
+        copyParentPropToChild(config, MRUtils.AC_INSTANCE_PROP);
+        copyParentPropToChild(config, MRUtils.AC_USERNAME_PROP);
+        copyParentPropToChild(config, MRUtils.AC_PWD_PROP);
+        //copyParentPropToChild(config, MRUtils.TABLE_PREFIX_PROPERTY);
+        //copyParentPropToChild(config, MRUtils.AC_AUTH_PROP);
+        //copyParentPropToChild(config, 
RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH);
+        copyParentPropToChild(config, MRUtils.AC_ZK_PROP);
+
+        MergeTool.setDuplicateKeys(config);
+    }
+
+    /**
+     * Copies the parent config property to the corresponding child property.
+     * @param config the {@link Configuration} to use.
+     * @param parentPropertyName the parent property name to use.
+     */
+    public static void copyParentPropToChild(Configuration config, String 
parentPropertyName) {
+        String parentValue = config.get(parentPropertyName, "");
+        config.set(parentPropertyName + MergeTool.CHILD_SUFFIX, parentValue);
+    }
+
+    @Override
+    protected void setupInputFormat(Job job) throws AccumuloSecurityException {
+        // set up accumulo input
+        if (!hdfsInput) {
+            job.setInputFormatClass(AccumuloInputFormat.class);
+        } else {
+            job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
+        }
+        AccumuloInputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+        AccumuloInputFormat.setInputTableName(job, 
RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix));
+        AccumuloInputFormat.setScanAuthorizations(job, authorizations);
+        if (!mock) {
+            AccumuloInputFormat.setZooKeeperInstance(job, new 
ClientConfiguration().withInstance(instance).withZkHosts(zk));
+        } else {
+            AccumuloInputFormat.setMockInstance(job, instance);
+        }
+        if (ttl != null) {
+            IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class);
+            AgeOffFilter.setTTL(setting, Long.valueOf(ttl));
+            AccumuloInputFormat.addIterator(job, setting);
+        }
+        for (IteratorSetting iteratorSetting : 
AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) {
+            AccumuloInputFormat.addIterator(job, iteratorSetting);
+        }
+    }
+
+    /**
+     * Sets up and runs the merge tool with the provided args.
+     * @param args the arguments list.
+     * @return the execution result.
+     */
+    public static int setupAndRun(String[] args) {
+        int returnCode = -1;
+        try {
+            Configuration conf = new Configuration();
+            Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, 
args);
+            if (!toolArgs.isEmpty()) {
+                String parameters = Joiner.on("\r\n\t").join(toolArgs);
+                log.info("Running Merge Tool with the following 
parameters...\r\n\t" + parameters);
+            }
+
+            returnCode = ToolRunner.run(conf, new MergeTool(), args);
+        } catch (Exception e) {
+            log.error("Error running merge tool", e);
+        }
+        return returnCode;
+    }
+
+    public static void main(String[] args) {
+        String log4jConfiguration = 
System.getProperties().getProperty("log4j.configuration");
+        if (StringUtils.isNotBlank(log4jConfiguration)) {
+            String parsedConfiguration = 
StringUtils.removeStart(log4jConfiguration, "file:");
+            File configFile = new File(parsedConfiguration);
+            if (configFile.exists()) {
+                DOMConfigurator.configure(parsedConfiguration);
+            } else {
+                BasicConfigurator.configure();
+            }
+        }
+        log.info("Starting Merge Tool");
+
+        Thread.setDefaultUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
+            @Override
+            public void uncaughtException(Thread thread, Throwable throwable) {
+                log.error("Uncaught exception in " + thread.getName(), 
throwable);
+            }
+        });
+
+        int returnCode = setupAndRun(args);
+
+        log.info("Finished running Merge Tool");
+
+        System.exit(returnCode);
+    }
+
+    /**
+     * Creates a formatted string for the start time based on the specified 
date and whether the dialog is to be displayed.
+     * @param startDate the start {@link Date} to format.
+     * @param isStartTimeDialogEnabled {@code true} to display the time dialog 
instead of using the date. {@code false}
+     * to use the provided {@code startDate}.
+     * @return the formatted start time string or {@code "dialog"}.
+     */
+    public static String getStartTimeString(Date startDate, boolean 
isStartTimeDialogEnabled) {
+        String startTimeString;
+        if (isStartTimeDialogEnabled) {
+            startTimeString = USE_START_TIME_DIALOG; // set start date from 
dialog box
+        } else {
+            startTimeString = convertDateToStartTimeString(startDate);
+        }
+        return startTimeString;
+    }
+
+    /**
+     * Converts the specified date into a string to use as the start time for 
the timestamp filter.
+     * @param date the start {@link Date} of the filter that will be formatted 
as a string.
+     * @return the formatted start time string.
+     */
+    public static String convertDateToStartTimeString(Date date) {
+        String startTimeString = START_TIME_FORMATTER.format(date);
+        return startTimeString;
+    }
+
+    /**
+     * Converts the specified string into a date to use as the start time for 
the timestamp filter.
+     * @param startTimeString the formatted time string.
+     * @return the start {@link Date}.
+     */
+    public static Date convertStartTimeStringToDate(String startTimeString) {
+        Date date;
+        try {
+            date = START_TIME_FORMATTER.parse(startTimeString);
+        } catch (ParseException e) {
+            log.error("Could not parse date", e);
+            return null;
+        }
+        return date;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/common/InstanceType.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/common/InstanceType.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/common/InstanceType.java
new file mode 100644
index 0000000..d976384
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/common/InstanceType.java
@@ -0,0 +1,56 @@
+package mvm.rya.accumulo.mr.merge.common;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+
+/**
+ * The type of Accumulo instance.
+ */
+public enum InstanceType {
+    /**
+     * An Accumulo instance that runs using a regular Accumulo distribution.
+     */
+    DISTRIBUTION,
+    /**
+     * An Accumulo instance that runs using a {@link MiniAccumuloCluster}.
+     */
+    MINI,
+    /**
+     * An Accumulo instance that runs using a {@link MockInstance}.
+     */
+    MOCK;
+
+    /**
+     * Finds the instance type by name.
+     * @param name the name to find.
+     * @return the {@link InstanceType} or {@code null} if none could be found.
+     */
+    public static InstanceType fromName(String name) {
+        for (InstanceType instanceType : InstanceType.values()) {
+            if (instanceType.toString().equals(name)) {
+                return instanceType;
+            }
+        }
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java
new file mode 100644
index 0000000..4bf004f
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java
@@ -0,0 +1,173 @@
+package mvm.rya.accumulo.mr.merge.gui;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.awt.GridBagConstraints;
+import java.awt.GridBagLayout;
+import java.awt.Insets;
+import java.awt.event.ActionEvent;
+import java.awt.event.ActionListener;
+import java.util.Calendar;
+import java.util.Date;
+
+import javax.swing.BorderFactory;
+import javax.swing.JButton;
+import javax.swing.JDialog;
+import javax.swing.JLabel;
+import javax.swing.JPanel;
+import javax.swing.JSpinner;
+import javax.swing.JSpinner.DateEditor;
+import javax.swing.SpinnerDateModel;
+import javax.swing.SpinnerModel;
+
+import com.toedter.calendar.JCalendar;
+
+/**
+ * Dialog for picking date and time.
+ */
+public class DateTimePickerDialog extends JDialog {
+    private static final long serialVersionUID = 1L;
+
+    private JCalendar dateChooser;
+    private JSpinner timeSpinner;
+
+    private Date selectedDateTime;
+    private JLabel label;
+
+
+    /**
+     * Creates a new instance of {@link DateTimePickerDialog}.
+     * @param title the title to display up top.
+     * @param message the message to display.
+     */
+    public DateTimePickerDialog(String title, String message) {
+        this(null, title, message);
+    }
+
+    /**
+     * Creates a new instance of {@link DateTimePickerDialog}.
+     * @param date the initial date to have the dialog show.
+     * @param title the title to display up top.
+     * @param message the message to display.
+     */
+    public DateTimePickerDialog(Date date, String title, String message) {
+        // Create a modal dialog
+        super((JDialog) null);
+        setTitle(title);
+        setModalityType(ModalityType.APPLICATION_MODAL);
+        setType(Type.NORMAL);
+
+        setLayout(new GridBagLayout());
+        setDefaultCloseOperation(JDialog.DISPOSE_ON_CLOSE);
+
+        JButton okButton = new JButton("OK");
+        okButton.addActionListener (new ActionListener() {
+            @Override
+            public void actionPerformed(ActionEvent event) {
+                selectedDateTime = findSelectedDateTime();
+
+                // Hide dialog
+                setVisible(false);
+            }
+        });
+
+        getRootPane().setDefaultButton(okButton);
+
+        JPanel dateTimePanel = buildDateTimePanel(date);
+        label = new JLabel (message);
+        label.setBorder(BorderFactory.createEtchedBorder());
+
+        GridBagConstraints c = new GridBagConstraints();
+        c.fill = GridBagConstraints.HORIZONTAL;
+        c.insets = new Insets(5, 5, 5, 5);
+        c.gridx = 0;
+        c.gridy = 0;
+
+        add(dateTimePanel, c);
+        c.gridy++;
+        add(label, c);
+        c.anchor = GridBagConstraints.EAST;
+        c.fill = GridBagConstraints.NONE;
+        c.gridy++;
+        add(okButton, c);
+
+        pack();
+    }
+
+    private JPanel buildDateTimePanel(Date date) {
+        JPanel datePanel = new JPanel();
+
+        dateChooser = new JCalendar();
+        if (date != null) {
+            Calendar calendar = Calendar.getInstance();
+            calendar.setTime(date);
+            dateChooser.setCalendar(calendar);
+        }
+
+        datePanel.add(dateChooser);
+
+        SpinnerModel model = new SpinnerDateModel();
+        timeSpinner = new JSpinner(model);
+        DateEditor editor = new DateEditor(timeSpinner, "HH:mm:ss");
+        timeSpinner.setEditor(editor);
+        if (date != null) {
+            timeSpinner.setValue(date);
+        }
+
+        datePanel.add(timeSpinner);
+
+        return datePanel;
+    }
+
+    private Date findSelectedDateTime() {
+        // Get the values from the date chooser
+        int day = dateChooser.getDayChooser().getDay();
+        int month = dateChooser.getMonthChooser().getMonth();
+        int year = dateChooser.getYearChooser().getYear();
+
+        // Get the values from the time chooser
+        Calendar timeCalendar = Calendar.getInstance();
+        timeCalendar.setTime((Date) timeSpinner.getValue());
+        int hour = timeCalendar.get(Calendar.HOUR_OF_DAY);
+        int minute = timeCalendar.get(Calendar.MINUTE);
+        int second = timeCalendar.get(Calendar.SECOND);
+
+        // Combine these values into a single date object
+        Calendar newCalendar = Calendar.getInstance();
+        newCalendar.set(Calendar.YEAR, year);
+        newCalendar.set(Calendar.MONTH, month);
+        newCalendar.set(Calendar.DATE, day);
+        newCalendar.set(Calendar.HOUR_OF_DAY, hour);
+        newCalendar.set(Calendar.MINUTE, minute);
+        newCalendar.set(Calendar.SECOND, second);
+
+        Date newDate = newCalendar.getTime();
+
+        return newDate;
+    }
+
+    /**
+     * @return the selected date time.
+     */
+    public Date getSelectedDateTime() {
+        return selectedDateTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java
new file mode 100644
index 0000000..c530b9e
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java
@@ -0,0 +1,52 @@
+package mvm.rya.accumulo.mr.merge.mappers;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Extended {@link BaseCopyToolMapper} that handles the {@code 
AccumuloOutputFormat} for the copy tool.
+ */
+public class AccumuloCopyToolMapper extends BaseCopyToolMapper<Key, Value, 
Text, Mutation> {
+    /**
+     * Creates a new {@link AccumuloCopyToolMapper}.
+     */
+    public AccumuloCopyToolMapper() {
+    }
+
+    @Override
+    protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+        //log.trace("Mapping key: " + key + " = " + value);
+        Mutation mutation = makeAddMutation(key, value);
+        context.write(childTableNameText, mutation);
+    }
+
+    private static Mutation makeAddMutation(Key key, Value value) {
+        Mutation mutation = new Mutation(key.getRow().getBytes());
+        mutation.put(key.getColumnFamily(), key.getColumnQualifier(), 
key.getColumnVisibilityParsed(), key.getTimestamp(), value);
+        return mutation;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java
new file mode 100644
index 0000000..68850c6
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java
@@ -0,0 +1,69 @@
+package mvm.rya.accumulo.mr.merge.mappers;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+
+/**
+ * Rule mapper that inserts any copied statements into a child Accumulo Rya 
instance, and outputs
+ * any raw rows as <Text, Mutation> pairs.
+ */
+public class AccumuloRyaRuleMapper extends BaseRuleMapper<Text, Mutation> {
+    /**
+     * Insert a statement into the child Accumulo instance via the child DAO.
+     * @param rstmt RyaStatement to add to the child
+     * @param context Map context, not used
+     * @throws IOException if the DAO encounters an error adding the statement 
to Accumulo
+     */
+    @Override
+    protected void copyStatement(RyaStatement rstmt, Context context) throws 
IOException {
+        try {
+            childDao.add(rstmt);
+        }
+        catch (RyaDAOException e) {
+            throw new IOException("Error inserting statement into child Rya 
DAO", e);
+        }
+    }
+
+    /**
+     * Output a row via the Hadoop framework (rather than a Rya interface) to 
the Mapper's configured
+     * child table.
+     * @param key Row's key
+     * @param value Row's value
+     * @param context Context to use for writing
+     * @throws InterruptedException if the framework is interrupted writing 
output
+     * @throws IOException if the framework encounters an error writing the 
row to Accumulo
+     */
+    @Override
+    protected void copyRow(Key key, Value value, Context context) throws 
IOException, InterruptedException {
+        Mutation mutation = new Mutation(key.getRow().getBytes());
+        mutation.put(key.getColumnFamily(), key.getColumnQualifier(), 
key.getColumnVisibilityParsed(), key.getTimestamp(), value);
+        context.write(childTableNameText, mutation);
+    }
+}

Reply via email to