http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/QueryRuleset.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/QueryRuleset.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/QueryRuleset.java
new file mode 100644
index 0000000..c14a067
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/QueryRuleset.java
@@ -0,0 +1,549 @@
+package mvm.rya.accumulo.mr.merge.util;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.UnsupportedQueryLanguageException;
+import org.openrdf.query.algebra.Filter;
+import org.openrdf.query.algebra.FunctionCall;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.ListMemberOperator;
+import org.openrdf.query.algebra.Or;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.Union;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedTupleQuery;
+import org.openrdf.query.parser.QueryParserUtil;
+import org.openrdf.sail.SailException;
+
+import mvm.rya.accumulo.mr.merge.CopyTool;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.rdftriplestore.RdfCloudTripleStore;
+import mvm.rya.rdftriplestore.inference.InferJoin;
+import mvm.rya.rdftriplestore.inference.InferUnion;
+import mvm.rya.rdftriplestore.inference.InferenceEngine;
+import mvm.rya.rdftriplestore.inference.InverseOfVisitor;
+import mvm.rya.rdftriplestore.inference.SameAsVisitor;
+import mvm.rya.rdftriplestore.inference.SubClassOfVisitor;
+import mvm.rya.rdftriplestore.inference.SubPropertyOfVisitor;
+import mvm.rya.rdftriplestore.inference.SymmetricPropertyVisitor;
+import mvm.rya.rdftriplestore.inference.TransitivePropertyVisitor;
+import mvm.rya.rdftriplestore.utils.FixedStatementPattern;
+import mvm.rya.rdftriplestore.utils.TransitivePropertySP;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Represents a set of {@link CopyRule} instances derived from a query. The 
ruleset determines a logical
+ * subset of statements in Rya, such that statements selected by the ruleset 
are at least enough to answer
+ * the query.
+ */
+public class QueryRuleset {
+    private static final Logger log = Logger.getLogger(QueryRuleset.class);
+
+    /**
+     * Represents an error attempting to convert a query to a set of rules.
+     */
+    public static class QueryRulesetException extends Exception {
+        private static final long serialVersionUID = 1L;
+        public QueryRulesetException(String s) {
+            super(s);
+        }
+        public QueryRulesetException(String s, Throwable throwable) {
+            super(s, throwable);
+        }
+    }
+
+    /**
+     * Takes in a parsed query tree and extracts the rules defining relevant 
statements.
+     */
+    private static class RulesetVisitor extends 
QueryModelVisitorBase<QueryRulesetException> {
+        List<CopyRule> rules = new LinkedList<>();
+        private Set<Value> superclasses = new HashSet<>();
+        private Set<Value> superproperties = new HashSet<>();
+        private Set<Value> sameAs = new HashSet<>();
+        private Set<Value> transitive = new HashSet<>();
+        private Set<Value> schemaProperties = new HashSet<>();
+
+        @Override
+        public void meet(StatementPattern node) throws QueryRulesetException {
+            Var predVar = node.getPredicateVar();
+            // If this is a transitive property node, just match all 
statements with that property
+            if (node instanceof TransitivePropertySP && predVar.hasValue()) {
+                node = new StatementPattern(new Var("transitiveSubject"), 
predVar,
+                        new Var("transitiveObject"), node.getContextVar());
+                // And make sure to grab the transitivity statement itself
+                transitive.add(predVar.getValue());
+            }
+            rules.add(new CopyRule(node));
+        }
+
+        @Override
+        public void meet(Filter node) throws QueryRulesetException {
+            ValueExpr condition = node.getCondition();
+            // If the condition is a function call, and we don't know about 
the function, don't try to test for it.
+            if (condition instanceof FunctionCall) {
+                String uri = ((FunctionCall) condition).getURI();
+                if (FunctionRegistry.getInstance().get(uri) == null) {
+                    // Just extract statement patterns from the child as if 
there were no filter.
+                    node.getArg().visit(this);
+                }
+            }
+            // Otherwise, assume we can test for it: extract rules from below 
this node, and add the condition to each one.
+            else {
+                RulesetVisitor childVisitor = new RulesetVisitor();
+                node.getArg().visit(childVisitor);
+                for (CopyRule rule : childVisitor.rules) {
+                    rule.addCondition(condition);
+                    this.rules.add(rule);
+                }
+                this.superclasses.addAll(childVisitor.superclasses);
+                this.superproperties.addAll(childVisitor.superproperties);
+            }
+        }
+
+        @Override
+        public void meet(Join node) throws QueryRulesetException {
+            TupleExpr left = node.getLeftArg();
+            TupleExpr right = node.getRightArg();
+            // If this join represents the application of inference logic, use 
its children to add the
+            // appropriate rules.
+            if (node instanceof InferJoin && left instanceof 
FixedStatementPattern) {
+                FixedStatementPattern fsp = (FixedStatementPattern) left;
+                Value predValue = fsp.getPredicateVar().getValue();
+                // If this is a subClassOf relation, fetch all subClassOf and 
equivalentClass
+                // relations involving the relevant classes.
+                if (RDFS.SUBCLASSOF.equals(predValue) && right instanceof 
StatementPattern) {
+                    StatementPattern dne = (StatementPattern) right;
+                    // If a subClassOf b equivalentClass c subClassOf d, then 
fsp will contain a statement
+                    // for each class in the hierarchy. If we match every 
subClassOf and equivalentClass
+                    // relation to any of {a,b,c,d}, then the hierarchy can be 
reconstructed.
+                    for (Statement st : fsp.statements) {
+                        Value superclassVal = st.getSubject();
+                        // Rule to match the type assignment:
+                        rules.add(new CopyRule(new 
StatementPattern(dne.getSubjectVar(),
+                                dne.getPredicateVar(),
+                                new Var(superclassVal.toString(), 
superclassVal),
+                                dne.getContextVar())));
+                        // Add to the set of classes for which we need the 
hierarchy:
+                        superclasses.add(superclassVal);
+                    }
+                }
+                // If this is a subPropertyOf relation, fetch all 
subPropertyOf and equivalentProperty
+                // relations involving the relevant properties.
+                else if (RDFS.SUBPROPERTYOF.equals(predValue) && right 
instanceof StatementPattern) {
+                    StatementPattern dne = (StatementPattern) right;
+                    // If p subPropertyOf q subPropertyOf r subPropertyOf s, 
then fsp will contain a statement
+                    // for each property in the hierarchy. If we match every 
subPropertyOf and equivalentProperty
+                    // relation to any of {p,q,r,s}, then the hierarchy can be 
reconstructed.
+                    for (Statement st : fsp.statements) {
+                        Value superpropVal = st.getSubject();
+                        // Rule to add the property:
+                        rules.add(new CopyRule(new 
StatementPattern(dne.getSubjectVar(),
+                                new Var(superpropVal.toString(), superpropVal),
+                                dne.getObjectVar(),
+                                dne.getContextVar())));
+                        // Add to the set of properties for which we need the 
hierarchy:
+                        superproperties.add(superpropVal);
+                    }
+                }
+                // If this is a sameAs expansion, it may have one or two levels
+                if (OWL.SAMEAS.equals(predValue)) {
+                    StatementPattern stmt = null;
+                    String replaceVar = fsp.getSubjectVar().getName();
+                    String replaceVarInner = null;
+                    List<Value> replacements = new LinkedList<>();
+                    List<Value> replacementsInner = new LinkedList<>();
+                    for (Statement st : fsp.statements) {
+                        replacements.add(st.getSubject());
+                    }
+                    if (right instanceof StatementPattern) {
+                        stmt = (StatementPattern) right;
+                    }
+                    else if (right instanceof InferJoin) {
+                        // Add the second set of replacements if given
+                        InferJoin inner = (InferJoin) right;
+                        if (inner.getLeftArg() instanceof FixedStatementPattern
+                                && inner.getRightArg() instanceof 
StatementPattern) {
+                            stmt = (StatementPattern) inner.getRightArg();
+                            fsp = (FixedStatementPattern) inner.getLeftArg();
+                            replaceVarInner = fsp.getSubjectVar().getName();
+                            for (Statement st : fsp.statements) {
+                                replacementsInner.add(st.getSubject());
+                            }
+                        }
+                    }
+                    // Add different versions of the original statement:
+                    if (stmt != null) {
+                        for (Value replacementVal : replacements) {
+                            if (replacementsInner.isEmpty()) {
+                                StatementPattern transformed = stmt.clone();
+                                if 
(transformed.getSubjectVar().equals(replaceVar)) {
+                                    transformed.setSubjectVar(new 
Var(replaceVar, replacementVal));
+                                }
+                                if 
(transformed.getObjectVar().equals(replaceVar)) {
+                                    transformed.setObjectVar(new 
Var(replaceVar, replacementVal));
+                                }
+                                rules.add(new CopyRule(transformed));
+                            }
+                            for (Value replacementValInner : 
replacementsInner) {
+                                StatementPattern transformed = stmt.clone();
+                                if 
(transformed.getSubjectVar().equals(replaceVar)) {
+                                    transformed.setSubjectVar(new 
Var(replaceVar, replacementVal));
+                                }
+                                else if 
(transformed.getSubjectVar().equals(replaceVarInner)) {
+                                    transformed.setSubjectVar(new 
Var(replaceVarInner, replacementValInner));
+                                }
+                                if 
(transformed.getObjectVar().equals(replaceVar)) {
+                                    transformed.setObjectVar(new 
Var(replaceVar, replacementVal));
+                                }
+                                else if 
(transformed.getObjectVar().equals(replaceVarInner)) {
+                                    transformed.setObjectVar(new 
Var(replaceVar, replacementValInner));
+                                }
+                                rules.add(new CopyRule(transformed));
+                            }
+                        }
+                    }
+                    // Add to the set of resources for which we need sameAs 
relations:
+                    sameAs.addAll(replacements);
+                    sameAs.addAll(replacementsInner);
+                }
+            }
+            // If it's a normal join, visit the children.
+            else {
+                super.meet(node);
+            }
+        }
+
+        @Override
+        public void meet(Union node) throws QueryRulesetException {
+            node.visitChildren(this);
+            if (node instanceof InferUnion) {
+                // If this is the result of inference, search each tree for 
(non-standard) properties and add them
+                // to the set of properties for which to include schema 
information.
+                QueryModelVisitorBase<QueryRulesetException> propertyVisitor = 
new QueryModelVisitorBase<QueryRulesetException>() {
+                    @Override
+                    public void meet(StatementPattern node) {
+                        if (node.getPredicateVar().hasValue()) {
+                            URI predValue = (URI) 
node.getPredicateVar().getValue();
+                            String ns = predValue.getNamespace();
+                            if (node instanceof FixedStatementPattern
+                                    && (RDFS.SUBPROPERTYOF.equals(predValue) 
|| OWL.EQUIVALENTPROPERTY.equals(predValue))) {
+                                // This FSP replaced a property, so find all 
the properties it entails
+                                FixedStatementPattern fsp = 
(FixedStatementPattern) node;
+                                for (Statement stmt : fsp.statements) {
+                                    schemaProperties.add(stmt.getSubject());
+                                }
+                            }
+                            else if (!(OWL.NAMESPACE.equals(ns) || 
RDFS.NAMESPACE.equals(ns) || RDF.NAMESPACE.equals(ns))) {
+                                // This is a regular triple pattern; grab its 
predicate
+                                schemaProperties.add(predValue);
+                            }
+                        }
+                    }
+                };
+                node.getLeftArg().visit(propertyVisitor);
+                node.getRightArg().visit(propertyVisitor);
+            }
+        }
+
+        /**
+         * Add rules covering the portions of the schema that may be necessary 
to use inference
+         * with this query.
+         */
+        public void addSchema() throws QueryRulesetException {
+            // Combine the relevant portions of the class hierarchy into one 
subclass rule and one equivalent class rule:
+            if (!superclasses.isEmpty()) {
+                Var superClassVar = new Var("superClassVar");
+                // Subclasses of the given classes:
+                addListRule(new Var("subClassVar"), null, RDFS.SUBCLASSOF, 
superClassVar, superclasses);
+                // Equivalent classes to the given classes (this might be 
stated in either direction):
+                addListRule(new Var("eqClassSubject"), superclasses, 
OWL.EQUIVALENTCLASS, new Var("eqClassObject"), superclasses);
+            }
+
+            // Combine the relevant portions of the property hierarchy into 
one subproperty rule and one equivalent property rule:
+            if (!superproperties.isEmpty()) {
+                Var superPropertyVar = new Var("superPropertyVar");
+                // Subproperties of the given properties:
+                addListRule(new Var("subPropertyVar"), null, 
RDFS.SUBPROPERTYOF, superPropertyVar, superproperties);
+                // Equivalent properties to the given properties (this might 
be stated in either direction):
+                addListRule(new Var("eqPropSubject"), superproperties, 
OWL.EQUIVALENTPROPERTY, new Var("eqPropObject"), superproperties);
+            }
+
+            // Get the relevant portions of the owl:sameAs graph
+            if (!sameAs.isEmpty()) {
+                Var sameAsSubj = new Var("sameAsSubject");
+                Var sameAsObj = new Var("sameAsObject");
+                addListRule(sameAsSubj, sameAs, OWL.SAMEAS, sameAsObj, sameAs);
+            }
+
+            // Get the potentially relevant owl:TransitiveProperty statements
+            if (!transitive.isEmpty()) {
+                Var transitiveVar = new Var(OWL.TRANSITIVEPROPERTY.toString(), 
OWL.TRANSITIVEPROPERTY);
+                addListRule(new Var("transitiveProp"), transitive, RDF.TYPE, 
transitiveVar, null);
+            }
+
+            // Get any owl:SymmetricProperty and owl:inverseOf statements for 
relevant properties
+            if (!schemaProperties.isEmpty()) {
+                Var symmetricVar = new Var(OWL.SYMMETRICPROPERTY.toString(), 
OWL.SYMMETRICPROPERTY);
+                addListRule(new Var("symmetricProp"), schemaProperties, 
RDF.TYPE, symmetricVar, null);
+                addListRule(new Var("inverseSubject"), schemaProperties, 
OWL.INVERSEOF, new Var("inverseObject"), schemaProperties);
+            }
+        }
+
+        /**
+         * Build and add a rule that matches triples having a specific 
predicate, where subject and object constraints
+         * are each defined using a Var and a set of Values, and each can 
represent one of: a constant value
+         * (Var has a value), an enumerated set of possible values, to be 
turned into a filter (Var has no
+         * Value and set of Values is non-null), or an unconstrained variable 
(Var has no value and set of
+         * Values is null). If both subject and object are variables with 
enumerated sets, only one part needs to
+         * match in order to accept the triple.
+         * @param subjVar Var corresponding to the subject. May have a 
specific value or represent a variable.
+         * @param subjValues Either null or a Set of Values that the subject 
variable can have, tested using a filter.
+         * @param predicate The URI for the predicate to match
+         * @param objVar Var corresponding to the object. May have a specific 
value or represent a variable.
+         * @param objValues Either null or a Set of Values that the object 
variable can have, tested using a filter
+         * @throws QueryRulesetException if the rule can't be created
+         */
+        private void addListRule(Var subjVar, Set<Value> subjValues, URI 
predicate,
+                Var objVar, Set<Value> objValues) throws QueryRulesetException 
{
+            ListMemberOperator subjCondition = null;
+            ListMemberOperator objCondition = null;
+            if (subjValues != null) {
+                subjCondition = new ListMemberOperator();
+                subjCondition.addArgument(subjVar);
+                for (Value constant : subjValues) {
+                    subjCondition.addArgument(new Var(constant.toString(), 
constant));
+                }
+            }
+            if (objValues != null) {
+                objCondition = new ListMemberOperator();
+                objCondition.addArgument(objVar);
+                for (Value constant : objValues) {
+                    objCondition.addArgument(new Var(constant.toString(), 
constant));
+                }
+            }
+            Var predVar = new Var(predicate.toString(), predicate);
+            CopyRule listRule = new CopyRule(new StatementPattern(subjVar, 
predVar, objVar));
+            if (subjCondition != null && objCondition != null) {
+                listRule.addCondition(new Or(subjCondition, objCondition));
+            }
+            else if (subjCondition != null) {
+                listRule.addCondition(subjCondition);
+            }
+            else if (objCondition != null) {
+                listRule.addCondition(objCondition);
+            }
+            rules.add(listRule);
+        }
+    }
+
+    /**
+     * The rules themselves -- any statement satisfying any of these rules 
will be copied.
+     */
+    protected Set<CopyRule> rules = new HashSet<>();
+
+    /**
+     * The SPARQL query that defines the ruleset.
+     */
+    protected String query;
+
+    /**
+     * A Rya configuration.
+     */
+    protected RdfCloudTripleStoreConfiguration conf;
+
+    /**
+     * Extract a set of rules from a query found in a Configuration.
+     * @param conf Configuration containing either the query string, or name 
of a file containing the query, plus inference parameters.
+     * @throws QueryRulesetException if the query can't be read, parsed, and 
resolved to valid rules
+     */
+    public QueryRuleset(RdfCloudTripleStoreConfiguration conf) throws 
QueryRulesetException {
+        this.conf = conf;
+        setQuery();
+        setRules();
+    }
+
+    /**
+     * Extract a set of rules from a query.
+     * @param query A SPARQL query string
+     * @throws QueryRulesetException if the query can't be parsed and resolved 
to valid rules
+     */
+    public QueryRuleset(String query) throws QueryRulesetException {
+        this.query = query;
+        setRules();
+    }
+
+    /**
+     * Get the query that was used to construct this ruleset.
+     * @return A SPARQL query
+     */
+    public String getQuery() {
+        return query;
+    }
+
+    /**
+     * Set this ruleset's defining query based on the configuration. Query can 
be
+     * specified directly or using a file; if it's read from a file, the query
+     * text will also be added to the configuration.
+     * @return SPARQL query
+     * @throws QueryRulesetException if there is no configuration, or if the 
query can't be found or read
+     */
+    private void setQuery() throws QueryRulesetException {
+        if (conf == null) {
+            throw new QueryRulesetException("No Configuration given");
+        }
+        query = conf.get(CopyTool.QUERY_STRING_PROP);
+        String queryFile = conf.get(CopyTool.QUERY_FILE_PROP);
+        if (query == null && queryFile != null) {
+            try {
+                FileReader fileReader = new FileReader(queryFile);
+                BufferedReader reader = new BufferedReader(fileReader);
+                StringBuilder builder = new StringBuilder();
+                String line = reader.readLine();
+                while (line != null) {
+                    builder.append(line).append("\n");
+                    line = reader.readLine();
+                }
+                query = builder.toString();
+                reader.close();
+                conf.set(CopyTool.QUERY_STRING_PROP, query);
+            }
+            catch (IOException e) {
+                throw new QueryRulesetException("Error loading query from 
file: " + queryFile, e);
+            }
+        }
+        else if (query == null) {
+            throw new QueryRulesetException("No query string or query file 
provided");
+        }
+    }
+
+    /**
+     * Extract the rules from the query string, applying inference rules if 
configured to.
+     * @throws QueryRulesetException if the parsed query can't be parsed and 
translated into valid rules.
+     */
+    private void setRules() throws QueryRulesetException {
+        final ParsedTupleQuery ptq;
+        final TupleExpr te;
+        try {
+            ptq = QueryParserUtil.parseTupleQuery(QueryLanguage.SPARQL, query, 
null);
+        }
+        catch (UnsupportedQueryLanguageException | MalformedQueryException e) {
+            throw new QueryRulesetException("Error parsing query:\n" + query, 
e);
+        }
+        te = ptq.getTupleExpr();
+        // Before converting to rules (and renaming variables), validate that 
no statement patterns
+        // consist of only variables (this would result in a rule  that 
matches every triple).
+        // Needs to be done before inference, since inference rules may create 
such statement patterns
+        // that are OK because they won'd be converted to rules directly.
+        te.visit(new QueryModelVisitorBase<QueryRulesetException>() {
+            @Override
+            public void meet(StatementPattern node) throws 
QueryRulesetException {
+                if (!(node.getSubjectVar().hasValue() || 
node.getPredicateVar().hasValue() || node.getObjectVar().hasValue())) {
+                    throw new QueryRulesetException("Statement pattern with no 
constants would match every statement:\n"
+                            + node + "\nFrom parsed query:\n" + te);
+                }
+            }
+        });
+        // Apply inference, if applicable
+        if (conf != null && conf.isInfer()) {
+            RdfCloudTripleStore store = null;
+            try {
+                log.info("Applying inference rules");
+                store = (RdfCloudTripleStore) RyaSailFactory.getInstance(conf);
+                InferenceEngine inferenceEngine = store.getInferenceEngine();
+                // Apply in same order as query evaluation:
+                te.visit(new TransitivePropertyVisitor(conf, inferenceEngine));
+                te.visit(new SymmetricPropertyVisitor(conf, inferenceEngine));
+                te.visit(new InverseOfVisitor(conf, inferenceEngine));
+                te.visit(new SubPropertyOfVisitor(conf, inferenceEngine));
+                te.visit(new SubClassOfVisitor(conf, inferenceEngine));
+                te.visit(new SameAsVisitor(conf, inferenceEngine));
+                log.info("Query after inference:\n");
+                for (String line : te.toString().split("\n")) {
+                    log.info("\t" + line);
+                }
+            }
+            catch (Exception e) {
+                throw new QueryRulesetException("Error applying inference to 
parsed query:\n" + te, e);
+            }
+            finally {
+                if (store != null) {
+                    try {
+                        store.shutDown();
+                    } catch (SailException e) {
+                        log.error("Error shutting down Sail after applying 
inference", e);
+                    }
+                }
+            }
+        }
+        // Extract the StatementPatterns and Filters and turn them into rules:
+        RulesetVisitor rv = new RulesetVisitor();
+        try {
+            te.visit(rv);
+            rv.addSchema();
+        }
+        catch (QueryRulesetException e) {
+            throw new QueryRulesetException("Error extracting rules from 
parsed query:\n" + te, e);
+        }
+        for (CopyRule candidateRule : rv.rules) {
+            boolean unique = true;
+            for (CopyRule otherRule : rv.rules) {
+                if (!candidateRule.equals(otherRule) && 
otherRule.isGeneralizationOf(candidateRule)) {
+                    unique = false;
+                    break;
+                }
+            }
+            if (unique) {
+                this.rules.add(candidateRule);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder("Original Query:\n\n\t");
+        sb.append(query.replace("\n", "\n\t")).append("\n\nRuleset:\n");
+        for (CopyRule rule : rules) {
+            sb.append("\n\t").append(rule.toString().replace("\n", 
"\n\t")).append("\n");
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/TimeUtils.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/TimeUtils.java 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/TimeUtils.java
new file mode 100644
index 0000000..8a9d3d2
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/TimeUtils.java
@@ -0,0 +1,349 @@
+package mvm.rya.accumulo.mr.merge.util;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.net.ntp.NTPUDPClient;
+import org.apache.commons.net.ntp.TimeInfo;
+import org.codehaus.plexus.util.StringUtils;
+import org.mortbay.jetty.HttpMethods;
+
+import com.google.common.net.HttpHeaders;
+
+import twitter4j.Logger;
+
+/**
+ * Utility methods for time.
+ */
+public final class TimeUtils {
+    private static final Logger log = Logger.getLogger(TimeUtils.class);
+
+    /**
+     * The default host name of the time server to use.
+     * List of time servers: http://tf.nist.gov/tf-cgi/servers.cgi
+     * Do not query time server more than once every 4 seconds.
+     */
+    public static final String DEFAULT_TIME_SERVER_HOST = "time.nist.gov";
+
+    private static final int NTP_SERVER_TIMEOUT_MS = 15000;
+
+    /**
+     * Queries the default NTP Server for the time.
+     * Do not query time server more than once every 4 seconds.
+     * @return the NTP server {@link Date} or {@code null}.
+     * @throws IOException
+     */
+    public static Date getDefaultNtpServerDate() throws IOException {
+        return getNtpServerDate(DEFAULT_TIME_SERVER_HOST);
+    }
+
+    /**
+     * Queries the specified NTP Server for the time.
+     * Do not query time server more than once every 4 seconds.
+     * @param timeServerHost the time server host name.
+     * @return the NTP server {@link Date} or {@code null}.
+     * @throws IOException
+     */
+    public static Date getNtpServerDate(String timeServerHost) throws 
IOException {
+        try {
+            TimeInfo timeInfo = null;
+            NTPUDPClient timeClient = new NTPUDPClient();
+            timeClient.setDefaultTimeout(NTP_SERVER_TIMEOUT_MS);
+            InetAddress inetAddress = InetAddress.getByName(timeServerHost);
+            if (inetAddress != null) {
+                timeInfo = timeClient.getTime(inetAddress);
+                if (timeInfo != null) {
+                    // TODO: which time to use?
+                    long serverTime = 
timeInfo.getMessage().getTransmitTimeStamp().getTime();
+                    //long serverTime = timeInfo.getReturnTime();
+                    Date ntpDate = new Date(serverTime);
+                    return ntpDate;
+                }
+            }
+        } catch (IOException e) {
+            throw new IOException("Unable to get NTP server time.", e);
+        }
+        return null;
+    }
+
+    /**
+     * Gets the remote machine's system time by checking the DATE field in the 
header
+     * from a HTTP HEAD method response.
+     * @param urlString the URL string of the remote machine's web server to 
connect to.
+     * @return the remote machine's system {@link Date} or {@code null}.
+     * @throws IOException
+     * @throws ParseException
+     */
+    public static Date getRemoteMachineDate(String urlString) throws 
IOException, ParseException {
+        Date remoteDate = null;
+        HttpURLConnection conn = null;
+        try {
+            URL url = new URL(urlString);
+
+            // Set up the initial connection
+            conn = (HttpURLConnection)url.openConnection();
+            // Use HEAD instead of GET so content isn't returned.
+            conn.setRequestMethod(HttpMethods.HEAD);
+            conn.setDoOutput(false);
+            conn.setReadTimeout(10000);
+
+            conn.connect();
+
+            Map<String, List<String>> header = conn.getHeaderFields();
+            for (String key : header.keySet()) {
+                if (key != null && HttpHeaders.DATE.equals(key)) {
+                    List<String> data = header.get(key);
+                    String dateString = data.get(0);
+                    SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM 
yyyy HH:mm:ss Z");
+                    remoteDate = sdf.parse(dateString);
+                    break;
+                }
+            }
+        } finally {
+            // Close the connection
+            conn.disconnect();
+        }
+
+        return remoteDate;
+    }
+
+    /**
+     * Gets the time difference between the 2 specified times from the NTP 
server time and the machine system time.
+     * @param ntpDate the {@link Date} from the NTP server host.
+     * @param machineDate the {@link Date} from the machine (either local or 
remote).
+     * @param isMachineLocal {@code true} if the {@code machineDate} from a 
local machine.  {@code false}
+     * if it's from a remote machine.
+     * @return the difference between the NTP server time and the machine's 
system time.  A positive value
+     * indicates that the machine's system time is ahead of the time server.  
A negative value indicates that
+     * the machine's system time is behind of the time server.
+     */
+    public static Long getTimeDifference(Date ntpDate, Date machineDate, 
boolean isMachineLocal) {
+        Long diff = null;
+        if (ntpDate != null && machineDate != null) {
+            log.info("NTP Server Time: " + ntpDate);
+            String machineLabel = isMachineLocal ? "Local" : "Remote";
+            log.info(machineLabel + " Machine Time: " + machineDate);
+            diff = machineDate.getTime() - ntpDate.getTime();
+
+            boolean isAhead = diff > 0;
+            String durationBreakdown = TimeUtils.getDurationBreakdown(diff, 
false);
+            log.info(machineLabel + " Machine time is " + (isAhead ? "ahead 
of" : "behind") + " NTP server time by " + durationBreakdown + ".");
+        }
+
+        return diff;
+    }
+
+    /**
+     * Gets the time difference between the NTP server and the local machine 
system time.
+     * @param timeServerHost the time server host name.
+     * @return the difference between the NTP server time and the local 
machine's system time.  A positive value
+     * indicates that the local machine's system time is ahead of the time 
server.  A negative value indicates that
+     * the local machine's system time is behind of the time server.
+     * @throws IOException
+     */
+    public static Long getNtpServerAndLocalMachineTimeDifference(String 
timeServerHost) throws IOException {
+        log.info("Getting NTP Server time from " + timeServerHost + "...");
+        Date ntpDate = getNtpServerDate(timeServerHost);
+        Long diff = null;
+        if (ntpDate != null) {
+            log.info("Getting Local Machine time...");
+            Date machineDate = new Date();
+
+            diff = getTimeDifference(ntpDate, machineDate, true);
+        }
+
+        return diff;
+    }
+
+    /**
+     * Gets the time difference between the NTP server and the remote machine 
system time.
+     * @param timeServerHost the time server host name.
+     * @param remoteMachineUrlString the URL string of the remote machine's 
web server to connect to.
+     * @return the difference between the NTP server time and the remote 
machine's system time.  A positive value
+     * indicates that the remote machine's system time is ahead the time 
server.  A negative value indicates that
+     * the remote machine's system time is behind the time server.
+     * @throws ParseException
+     * @throws IOException
+     */
+    public static Long getNtpServerAndRemoteMachineTimeDifference(String 
timeServerHost, String remoteMachineUrlString) throws IOException, 
ParseException {
+        log.info("Getting NTP Server time from " + timeServerHost + "...");
+        Date ntpDate = getNtpServerDate(timeServerHost);
+        Long diff = null;
+        if (ntpDate != null) {
+            log.info("Getting Remote Machine time from " + 
remoteMachineUrlString + "...");
+            Date machineDate = getRemoteMachineDate(remoteMachineUrlString);
+
+            diff = getTimeDifference(ntpDate, machineDate, false);
+        }
+
+        return diff;
+    }
+
+    /**
+     * Gets the time difference between the NTP server and the machine system 
time (either locally or remotely depending on the URL).
+     * @param timeServerHost the time server host name.
+     * @param machineUrlString the URL string of the machine's web server to 
connect to.  The machine might be
+     * local or remote.
+     * @return the difference between the NTP server time and the machine's 
system time.  A positive value
+     * indicates that the machine's system time is ahead of the time server.  
A negative value indicates that
+     * the machine's system time is behind the time server.
+     * @throws ParseException
+     * @throws IOException
+     */
+    public static Long getNtpServerAndMachineTimeDifference(String 
timeServerHost, String machineUrlString) throws IOException, ParseException {
+        boolean isUrlLocalMachine = isUrlLocalMachine(machineUrlString);
+
+        Long machineTimeOffset;
+        if (isUrlLocalMachine) {
+            machineTimeOffset = 
getNtpServerAndLocalMachineTimeDifference(timeServerHost);
+        } else {
+            machineTimeOffset = 
getNtpServerAndRemoteMachineTimeDifference(timeServerHost, machineUrlString);
+        }
+
+        return machineTimeOffset;
+    }
+
+    /**
+     * Gets the machine system time (either locally or remotely depending on 
the URL).
+     * @param urlString the URL string of the machine to check.
+     * @return the machine's system time.
+     * @throws IOException
+     * @throws ParseException
+     */
+    public static Date getMachineDate(String urlString) throws IOException, 
ParseException {
+        boolean isMachineLocal = isUrlLocalMachine(urlString);
+
+        Date machineDate;
+        if (isMachineLocal) {
+            // Get local system machine time
+            machineDate = new Date();
+        } else {
+            // Get remote machine time from HTTP HEAD response.  Check hosted 
server web page on machine for time.
+            machineDate = getRemoteMachineDate(urlString);
+        }
+
+        return machineDate;
+    }
+
+    /**
+     * Checks to see if the URL provided is hosted on the local machine or not.
+     * @param urlString the URL string to check.
+     * @return {@code true} if the URL is hosted on the local machine.  {@code 
false}
+     * if it's on a remote machine.
+     * @throws UnknownHostException
+     * @throws MalformedURLException
+     */
+    public static boolean isUrlLocalMachine(String urlString) throws 
UnknownHostException, MalformedURLException {
+        String localAddress = InetAddress.getLocalHost().getHostAddress();
+        String requestAddress = InetAddress.getByName(new 
URL(urlString).getHost()).getHostAddress();
+        return localAddress != null && requestAddress != null && 
localAddress.equals(requestAddress);
+    }
+
+    /**
+     * Convert a millisecond duration to a string format.
+     * @param durationMs A duration to convert to a string form.
+     * @return A string of the form "X Days Y Hours Z Minutes A Seconds B 
Milliseconds".
+     */
+    public static String getDurationBreakdown(final long durationMs) {
+        return getDurationBreakdown(durationMs, true);
+    }
+
+    /**
+     * Convert a millisecond duration to a string format.
+     * @param durationMs A duration to convert to a string form.
+     * @param showSign {@code true} to show if the duration is positive or 
negative. {@code false}
+     * to not display the sign.
+     * @return A string of the form "X Days Y Hours Z Minutes A Seconds B 
Milliseconds".
+     */
+    public static String getDurationBreakdown(final long durationMs, boolean 
showSign) {
+        long tempDurationMs = Math.abs(durationMs);
+
+        long days = TimeUnit.MILLISECONDS.toDays(tempDurationMs);
+        tempDurationMs -= TimeUnit.DAYS.toMillis(days);
+        long hours = TimeUnit.MILLISECONDS.toHours(tempDurationMs);
+        tempDurationMs -= TimeUnit.HOURS.toMillis(hours);
+        long minutes = TimeUnit.MILLISECONDS.toMinutes(tempDurationMs);
+        tempDurationMs -= TimeUnit.MINUTES.toMillis(minutes);
+        long seconds = TimeUnit.MILLISECONDS.toSeconds(tempDurationMs);
+        tempDurationMs -= TimeUnit.SECONDS.toMillis(seconds);
+        long milliseconds = TimeUnit.MILLISECONDS.toMillis(tempDurationMs);
+
+        StringBuilder sb = new StringBuilder();
+        if (tempDurationMs != 0 && showSign) {
+            sb.append(tempDurationMs > 0 ? "+" : "-");
+        }
+        if (days > 0) {
+            sb.append(days);
+            sb.append(days == 1 ? " Day " : " Days ");
+        }
+        if (hours > 0) {
+            sb.append(hours);
+            sb.append(hours == 1 ? " Hour " : " Hours ");
+        }
+        if (minutes > 0) {
+            sb.append(minutes);
+            sb.append(minutes == 1 ? " Minute " : " Minutes ");
+        }
+        if (seconds > 0) {
+            sb.append(seconds);
+            sb.append(seconds == 1 ? " Second " : " Seconds " );
+        }
+        if (milliseconds > 0 || (!showSign && sb.length() == 0) || (showSign 
&& sb.length() == 1)) {
+            // At least show the milliseconds if nothing else has been shown 
so far
+            sb.append(milliseconds);
+            sb.append(milliseconds == 1 ? " Millisecond" : " Milliseconds");
+        }
+
+        return StringUtils.trim(sb.toString());
+    }
+
+    /**
+     * Checks if a date is before another date or if they are equal.
+     * @param date1 the first {@link Date}.
+     * @param date2 the second {@link Date}.
+     * @return {@code true} if {@code date1} is before or equal to {@code 
date2}.  {@code false} otherwise.
+     */
+    public static boolean dateBeforeInclusive(Date date1, Date date2) {
+        return !date1.after(date2);
+    }
+
+    /**
+     * Checks if a date is after another date or if they are equal.
+     * @param date1 the first {@link Date}.
+     * @param date2 the second {@link Date}.
+     * @return {@code true} if {@code date1} is after or equal to {@code 
date2}.  {@code false} otherwise.
+     */
+    public static boolean dateAfterInclusive(Date date1, Date date2) {
+        return !date1.before(date2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/ToolConfigUtils.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/ToolConfigUtils.java
 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/ToolConfigUtils.java
new file mode 100644
index 0000000..4e06166
--- /dev/null
+++ 
b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/ToolConfigUtils.java
@@ -0,0 +1,136 @@
+package mvm.rya.accumulo.mr.merge.util;
+
+/*
+ * #%L
+ * mvm.rya.accumulo.mr.merge
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility methods for the merge tool and copy tool configuration files.
+ */
+public final class ToolConfigUtils {
+    private static final Logger log = Logger.getLogger(ToolConfigUtils.class);
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private ToolConfigUtils() {
+    }
+
+    /**
+     * Gets the set of user arguments from the user's config and/or their 
extra supplied
+     * command line arguments.  This weeds out all the automatically generated 
parameters created
+     * from initializing a {@link Configuration} object and should only give 
back a set of arguments
+     * provided directly by the user.
+     * @param conf the {@link Configuration} provided.
+     * @param args the extra arguments from the command line.
+     * @return a {@link Set} of argument strings.
+     * @throws IOException
+     */
+    public static Set<String> getUserArguments(Configuration conf, String[] 
args) throws IOException {
+        String[] filteredArgs = new String[] {};
+        if (Arrays.asList(args).contains("-conf")) {
+            // parse args
+            new GenericOptionsParser(conf, args);
+
+            List<String> commandLineArgs = new ArrayList<>();
+            for (String arg : args) {
+                if (arg.startsWith("-D")) {
+                    commandLineArgs.add(arg);
+                }
+            }
+            filteredArgs = commandLineArgs.toArray(new String[0]);
+        } else {
+            filteredArgs = args;
+        }
+
+        // Get the supplied config name from the resource string.
+        // No real easy way of getting the name.
+        // So, pulling it off the list of resource names in the 
Configuration's toString() method
+        // where it should be the last one.
+        String confString = conf.toString();
+        String resourceString = StringUtils.removeStart(confString, 
"Configuration: ");
+        List<String> resourceNames = 
Arrays.asList(StringUtils.split(resourceString, ", "));
+        String configFilename = resourceNames.get(resourceNames.size() - 1);
+
+        Set<String> toolArgsSet = new HashSet<>();
+        File file = new File(configFilename);
+        // Check that the last resource name is the actual user's config by 
seeing if it's a file
+        // on the system, the other resources seem to be contained in jars and 
so should fail here which
+        // should happen if no config is supplied.
+        if (file.exists()) {
+            XMLConfiguration configuration = null;
+            try {
+                configuration = new XMLConfiguration(configFilename);
+                toolArgsSet.addAll(getConfigArguments(configuration));
+            } catch (ConfigurationException e) {
+                log.error("Unable to load configuration file.", e);
+            }
+        }
+
+        toolArgsSet.addAll(Arrays.asList(filteredArgs));
+        return Collections.unmodifiableSet(toolArgsSet);
+    }
+
+    /**
+     * Reads in the configuration file properties and values and converts them
+     * into a set of argument strings.
+     * @param configuration the {@link XMLConfiguration}.
+     * @return the set of argument strings.
+     */
+    public static Set<String> getConfigArguments(XMLConfiguration 
configuration) {
+        int size = configuration.getList("property.name").size();
+        TreeSet<String> configArgs = new TreeSet<>();
+        for (int i = 0; i < size; i++) {
+            String propertyName = configuration.getString("property(" + i + 
").name");
+            String propertyValue = configuration.getString("property(" + i + 
").value");
+            String argument = makeArgument(propertyName, propertyValue);
+            configArgs.add(argument);
+        }
+        return configArgs;
+    }
+
+    /**
+     * Creates an argument string from the specified property name and value.
+     * If the property name is "config.file" and value is "config.xml" then 
this will
+     * create an argument string of "-Dconfig.file=config.xml"
+     * @param propertyName the property name.
+     * @param value the value.
+     * @return the argument string.
+     */
+    public static String makeArgument(String propertyName, String value) {
+        return "-D" + propertyName + "=" + value;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/CopyToolTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/CopyToolTest.java 
b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/CopyToolTest.java
new file mode 100644
index 0000000..0acfbd1
--- /dev/null
+++ 
b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/CopyToolTest.java
@@ -0,0 +1,339 @@
+package mvm.rya.accumulo.mr.merge;
+
+import static mvm.rya.accumulo.mr.merge.util.TestUtils.LAST_MONTH;
+import static mvm.rya.accumulo.mr.merge.util.TestUtils.TODAY;
+import static mvm.rya.accumulo.mr.merge.util.TestUtils.YESTERDAY;
+import static mvm.rya.accumulo.mr.merge.util.TestUtils.createRyaStatement;
+import static mvm.rya.accumulo.mr.merge.util.ToolConfigUtils.makeArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.merge.common.InstanceType;
+import mvm.rya.accumulo.mr.merge.driver.AccumuloDualInstanceDriver;
+import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils;
+import mvm.rya.accumulo.mr.merge.util.TestUtils;
+import mvm.rya.accumulo.mr.merge.util.TimeUtils;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+/**
+ * Tests for {@link CopyTool}.
+ */
+public class CopyToolTest {
+    private static final Logger log = Logger.getLogger(CopyToolTest.class);
+
+    private static final boolean IS_MOCK = true;
+    private static final boolean USE_TIME_SYNC = false;
+    private static final boolean USE_COPY_FILE_OUTPUT = false;
+    private static final boolean IS_START_TIME_DIALOG_ENABLED = false;
+
+    private static final String CHILD_SUFFIX = MergeTool.CHILD_SUFFIX;
+
+    private static final String PARENT_PASSWORD = 
AccumuloDualInstanceDriver.PARENT_PASSWORD;
+    private static final String PARENT_INSTANCE = 
AccumuloDualInstanceDriver.PARENT_INSTANCE;
+    private static final String PARENT_TABLE_PREFIX = 
AccumuloDualInstanceDriver.PARENT_TABLE_PREFIX;
+    private static final String PARENT_AUTH = 
AccumuloDualInstanceDriver.PARENT_AUTH;
+    private static final ColumnVisibility PARENT_COLUMN_VISIBILITY = new 
ColumnVisibility(PARENT_AUTH);
+    private static final String PARENT_TOMCAT_URL = 
"http://rya-example-box:8080";;
+
+    private static final String CHILD_PASSWORD = 
AccumuloDualInstanceDriver.CHILD_PASSWORD;
+    private static final String CHILD_INSTANCE = 
AccumuloDualInstanceDriver.CHILD_INSTANCE;
+    private static final String CHILD_TABLE_PREFIX = 
AccumuloDualInstanceDriver.CHILD_TABLE_PREFIX;
+    private static final String CHILD_TOMCAT_URL = "http://localhost:8080";;
+
+    private static AccumuloRyaDAO parentDao;
+    private static AccumuloRyaDAO childDao;
+
+    private static AccumuloRdfConfiguration parentConfig;
+    private static AccumuloRdfConfiguration childConfig;
+
+    private static AccumuloDualInstanceDriver accumuloDualInstanceDriver;
+    private static CopyTool copyTool = null;
+    private boolean isImporting = false;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        accumuloDualInstanceDriver = new AccumuloDualInstanceDriver(IS_MOCK, 
true, true, false, false);
+        accumuloDualInstanceDriver.setUpInstances();
+    }
+
+    @Before
+    public void setUpPerTest() throws Exception {
+        accumuloDualInstanceDriver.setUpTables();
+
+        accumuloDualInstanceDriver.setUpDaos();
+
+        accumuloDualInstanceDriver.setUpConfigs();
+
+        parentConfig = accumuloDualInstanceDriver.getParentConfig();
+        childConfig = accumuloDualInstanceDriver.getChildConfig();
+        parentDao = accumuloDualInstanceDriver.getParentDao();
+        childDao = accumuloDualInstanceDriver.getChildDao();
+    }
+
+    @After
+    public void tearDownPerTest() throws Exception {
+        log.info("tearDownPerTest(): tearing down now.");
+        accumuloDualInstanceDriver.tearDownTables();
+        accumuloDualInstanceDriver.tearDownDaos();
+        if (copyTool != null) {
+            copyTool.shutdown();
+        }
+    }
+
+    @AfterClass
+    public static void tearDownPerClass() throws Exception {
+        log.info("tearDownPerClass(): tearing down now.");
+        accumuloDualInstanceDriver.tearDown();
+    }
+
+    private void assertStatementInChild(String description, int 
verifyResultCount, RyaStatement matchStatement) throws RyaDAOException {
+        TestUtils.assertStatementInInstance(description, verifyResultCount, 
matchStatement, childDao, childConfig);
+    }
+
+    private void copyToolRun(Date startDate) throws AccumuloException, 
AccumuloSecurityException {
+        copyTool = new CopyTool();
+        copyTool.setupAndRun(new String[] {
+                makeArgument(MRUtils.AC_MOCK_PROP, Boolean.toString(IS_MOCK)),
+                makeArgument(MRUtils.AC_INSTANCE_PROP, PARENT_INSTANCE),
+                makeArgument(MRUtils.AC_USERNAME_PROP, 
accumuloDualInstanceDriver.getParentUser()),
+                makeArgument(MRUtils.AC_PWD_PROP, PARENT_PASSWORD),
+                makeArgument(MRUtils.TABLE_PREFIX_PROPERTY, 
PARENT_TABLE_PREFIX),
+                makeArgument(MRUtils.AC_AUTH_PROP, 
accumuloDualInstanceDriver.getParentAuths().toString()),
+                makeArgument(MRUtils.AC_ZK_PROP, 
accumuloDualInstanceDriver.getParentZooKeepers()),
+                makeArgument(CopyTool.PARENT_TOMCAT_URL_PROP, 
PARENT_TOMCAT_URL),
+                makeArgument(MRUtils.AC_MOCK_PROP + CHILD_SUFFIX, 
Boolean.toString(IS_MOCK)),
+                makeArgument(MRUtils.AC_INSTANCE_PROP + CHILD_SUFFIX, 
CHILD_INSTANCE),
+                makeArgument(MRUtils.AC_USERNAME_PROP + CHILD_SUFFIX, 
accumuloDualInstanceDriver.getChildUser()),
+                makeArgument(MRUtils.AC_PWD_PROP + CHILD_SUFFIX, 
CHILD_PASSWORD),
+                makeArgument(MRUtils.TABLE_PREFIX_PROPERTY + CHILD_SUFFIX, 
CHILD_TABLE_PREFIX),
+                makeArgument(MRUtils.AC_AUTH_PROP + CHILD_SUFFIX, 
accumuloDualInstanceDriver.getChildAuths() != null ? 
accumuloDualInstanceDriver.getChildAuths().toString() : null),
+                makeArgument(MRUtils.AC_ZK_PROP + CHILD_SUFFIX, 
accumuloDualInstanceDriver.getChildZooKeepers() != null ? 
accumuloDualInstanceDriver.getChildZooKeepers() : "localhost"),
+                makeArgument(CopyTool.CHILD_TOMCAT_URL_PROP, CHILD_TOMCAT_URL),
+                makeArgument(CopyTool.CREATE_CHILD_INSTANCE_TYPE_PROP, 
(IS_MOCK ? InstanceType.MOCK : InstanceType.MINI).toString()),
+                makeArgument(CopyTool.NTP_SERVER_HOST_PROP, 
TimeUtils.DEFAULT_TIME_SERVER_HOST),
+                makeArgument(CopyTool.USE_NTP_SERVER_PROP, 
Boolean.toString(USE_TIME_SYNC)),
+                makeArgument(CopyTool.USE_COPY_FILE_OUTPUT, 
Boolean.toString(USE_COPY_FILE_OUTPUT)),
+                makeArgument(CopyTool.COPY_FILE_OUTPUT_PATH, 
"/test/copy_tool_file_output/"),
+                makeArgument(CopyTool.COPY_FILE_OUTPUT_COMPRESSION_TYPE, 
Algorithm.GZ.getName()),
+                makeArgument(CopyTool.USE_COPY_FILE_OUTPUT_DIRECTORY_CLEAR, 
Boolean.toString(true)),
+                makeArgument(CopyTool.COPY_FILE_IMPORT_DIRECTORY, 
"resources/test/copy_tool_file_output/"),
+                makeArgument(CopyTool.USE_COPY_FILE_IMPORT, 
Boolean.toString(isImporting)),
+                makeArgument(MergeTool.START_TIME_PROP, 
MergeTool.getStartTimeString(startDate, IS_START_TIME_DIALOG_ENABLED))
+        });
+
+        Configuration toolConfig = copyTool.getConf();
+        String zooKeepers = toolConfig.get(MRUtils.AC_ZK_PROP + CHILD_SUFFIX);
+        MergeTool.setDuplicateKeysForProperty(childConfig, MRUtils.AC_ZK_PROP, 
zooKeepers);
+
+        log.info("Finished running tool.");
+    }
+
+    @Test
+    public void testCopyTool() throws Exception {
+        RyaStatement ryaStatementOutOfTimeRange = createRyaStatement("coach", 
"called", "timeout", LAST_MONTH);
+
+        RyaStatement ryaStatementShouldCopy1 = createRyaStatement("bob", 
"catches", "ball", YESTERDAY);
+        RyaStatement ryaStatementShouldCopy2 = createRyaStatement("bill", 
"talks to", "john", YESTERDAY);
+        RyaStatement ryaStatementShouldCopy3 = createRyaStatement("susan", 
"eats", "burgers", TODAY);
+        RyaStatement ryaStatementShouldCopy4 = createRyaStatement("ronnie", 
"plays", "guitar", TODAY);
+
+        RyaStatement ryaStatementDoesNotExist1 = createRyaStatement("nobody", 
"was", "here", LAST_MONTH);
+        RyaStatement ryaStatementDoesNotExist2 = 
createRyaStatement("statement", "not", "found", YESTERDAY);
+        RyaStatement ryaStatementDoesNotExist3 = createRyaStatement("key", 
"does not", "exist", TODAY);
+
+        // This statement was modified by the child to change the column 
visibility.
+        // The parent should combine the child's visibility with its 
visibility.
+        RyaStatement ryaStatementVisibilityDifferent = createRyaStatement("I", 
"see", "you", YESTERDAY);
+        
ryaStatementVisibilityDifferent.setColumnVisibility(PARENT_COLUMN_VISIBILITY.getExpression());
+
+        // Setup initial parent instance with 7 rows
+        // This is the state of the parent data (as it is today) before 
merging occurs which will use the specified start time of yesterday.
+        parentDao.add(ryaStatementOutOfTimeRange);      // Process should NOT 
copy statement
+        parentDao.add(ryaStatementShouldCopy1);         // Process should copy 
statement
+        parentDao.add(ryaStatementShouldCopy2);         // Process should copy 
statement
+        parentDao.add(ryaStatementShouldCopy3);         // Process should copy 
statement
+        parentDao.add(ryaStatementShouldCopy4);         // Process should copy 
statement
+        parentDao.add(ryaStatementVisibilityDifferent); // Process should copy 
and update statement
+
+
+        AccumuloRyaUtils.printTable(PARENT_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, parentConfig);
+        //AccumuloRyaUtils.printTable(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig);
+
+        log.info("Starting copy tool. Copying all data after the specified 
start time: " + YESTERDAY);
+
+        isImporting = false;
+        copyToolRun(YESTERDAY);
+
+
+        // Copy Tool made child instance so hook the tables and dao into the 
driver.
+        String childUser = accumuloDualInstanceDriver.getChildUser();
+        Connector childConnector = ConfigUtils.getConnector(childConfig);
+        
accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setConnector(childConnector);
+
+        
accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpTables();
+
+        accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpDao();
+        childDao = accumuloDualInstanceDriver.getChildDao();
+
+
+        // Update child config to include changes made from copy process
+        SecurityOperations childSecOps = 
accumuloDualInstanceDriver.getChildSecOps();
+        Authorizations newChildAuths = 
AccumuloRyaUtils.addUserAuths(childUser, childSecOps, PARENT_AUTH);
+        childSecOps.changeUserAuthorizations(childUser, newChildAuths);
+        String childAuthString = newChildAuths.toString();
+        List<String> duplicateKeys = 
MergeTool.DUPLICATE_KEY_MAP.get(MRUtils.AC_AUTH_PROP);
+        childConfig.set(MRUtils.AC_AUTH_PROP, childAuthString);
+        for (String key : duplicateKeys) {
+            childConfig.set(key, childAuthString);
+        }
+        AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, childConfig);
+        AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, childConfig);
+        AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig);
+
+        Scanner scanner = AccumuloRyaUtils.getScanner(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig);
+        Iterator<Entry<Key, Value>> iterator = scanner.iterator();
+        int count = 0;
+        while (iterator.hasNext()) {
+            iterator.next();
+            count++;
+        }
+        // Make sure we have all of them in the parent.
+        assertEquals(5, count);
+
+
+        assertStatementInChild("Child included statement that was out of time 
range", 0, ryaStatementOutOfTimeRange);
+
+        assertStatementInChild("Child missing statement 1 that was in parent", 
1, ryaStatementShouldCopy1);
+
+        assertStatementInChild("Child missing statement 2 that was in parent", 
1, ryaStatementShouldCopy2);
+
+        assertStatementInChild("Child missing statement 3 that was in parent", 
1, ryaStatementShouldCopy3);
+
+        assertStatementInChild("Child missing statement 4 that was in parent", 
1, ryaStatementShouldCopy4);
+
+        assertStatementInChild("Child included statement 1 that was not in 
parent", 0, ryaStatementDoesNotExist1);
+
+        assertStatementInChild("Child included statement 2 that was not in 
parent", 0, ryaStatementDoesNotExist2);
+
+        assertStatementInChild("Child included statement 3 that was not in 
parent", 0, ryaStatementDoesNotExist3);
+
+        // Check that it can be queried with child's visibility
+        assertStatementInChild("Child missing statement with child 
visibility", 1, ryaStatementVisibilityDifferent);
+
+        // Check that it can be queried with parent's visibility
+        childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, 
PARENT_AUTH);
+        SecurityOperations secOps = IS_MOCK ? 
accumuloDualInstanceDriver.getChildSecOps() : childSecOps;
+        newChildAuths = 
AccumuloRyaUtils.addUserAuths(accumuloDualInstanceDriver.getChildUser(), 
secOps, PARENT_AUTH);
+        
secOps.changeUserAuthorizations(accumuloDualInstanceDriver.getChildUser(), 
newChildAuths);
+        assertStatementInChild("Child missing statement with parent 
visibility", 1, ryaStatementVisibilityDifferent);
+
+        // Check that it can NOT be queried with some other visibility
+        childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, 
"bad_auth");
+        CloseableIteration<RyaStatement, RyaDAOException> iter = 
childDao.getQueryEngine().query(ryaStatementVisibilityDifferent, childConfig);
+        count = 0;
+        try {
+            while (iter.hasNext()) {
+                iter.next();
+                count++;
+            }
+        } catch (Exception e) {
+            // Expected
+            if (!(e.getCause() instanceof AccumuloSecurityException)) {
+                fail();
+            }
+        }
+        iter.close();
+        assertEquals(0, count);
+
+        // reset auth
+        childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, 
childAuthString);
+
+        log.info("DONE");
+    }
+
+    @Test
+    public void testImportDirectoryTool() throws Exception {
+        log.info("");
+        log.info("Setting up initial state of parent before importing 
directory to child...");
+        log.info("Adding data to parent...");
+
+        log.info("Starting import directory tool. Importing all data after the 
specified start time: " + YESTERDAY);
+        log.info("");
+
+        isImporting = true;
+        copyToolRun(YESTERDAY);
+
+
+        // Import Directory Tool made child instance so hook the tables and 
dao into the driver.
+        String childUser = accumuloDualInstanceDriver.getChildUser();
+        Connector childConnector = ConfigUtils.getConnector(childConfig);
+        
accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setConnector(childConnector);
+
+        
accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpTables();
+
+        accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpDao();
+
+
+        // Update child config to include changes made from import directory 
process
+        SecurityOperations childSecOps = 
accumuloDualInstanceDriver.getChildSecOps();
+        Authorizations newChildAuths = 
AccumuloRyaUtils.addUserAuths(childUser, childSecOps, PARENT_AUTH);
+        childSecOps.changeUserAuthorizations(childUser, newChildAuths);
+        String childAuthString = newChildAuths.toString();
+        List<String> duplicateKeys = 
MergeTool.DUPLICATE_KEY_MAP.get(MRUtils.AC_AUTH_PROP);
+        childConfig.set(MRUtils.AC_AUTH_PROP, childAuthString);
+        for (String key : duplicateKeys) {
+            childConfig.set(key, childAuthString);
+        }
+
+
+        //AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, childConfig);
+        //AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, childConfig);
+        AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig);
+
+        Scanner scanner = AccumuloRyaUtils.getScanner(CHILD_TABLE_PREFIX + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig);
+        Iterator<Entry<Key, Value>> iterator = scanner.iterator();
+        int count = 0;
+        while (iterator.hasNext()) {
+            iterator.next();
+            count++;
+        }
+        log.info("");
+        log.info("Total rows imported: " + count);
+        log.info("");
+
+        assertEquals(20, count);
+
+        log.info("DONE");
+    }
+}

Reply via email to