http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/QueryRuleset.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/QueryRuleset.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/QueryRuleset.java new file mode 100644 index 0000000..c14a067 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/QueryRuleset.java @@ -0,0 +1,549 @@ +package mvm.rya.accumulo.mr.merge.util; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedReader; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import org.apache.log4j.Logger; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.UnsupportedQueryLanguageException; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.ListMemberOperator; +import org.openrdf.query.algebra.Or; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.Union; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.ParsedTupleQuery; +import org.openrdf.query.parser.QueryParserUtil; +import org.openrdf.sail.SailException; + +import mvm.rya.accumulo.mr.merge.CopyTool; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.inference.InferJoin; +import mvm.rya.rdftriplestore.inference.InferUnion; +import mvm.rya.rdftriplestore.inference.InferenceEngine; +import mvm.rya.rdftriplestore.inference.InverseOfVisitor; +import mvm.rya.rdftriplestore.inference.SameAsVisitor; +import mvm.rya.rdftriplestore.inference.SubClassOfVisitor; +import mvm.rya.rdftriplestore.inference.SubPropertyOfVisitor; +import mvm.rya.rdftriplestore.inference.SymmetricPropertyVisitor; +import mvm.rya.rdftriplestore.inference.TransitivePropertyVisitor; +import mvm.rya.rdftriplestore.utils.FixedStatementPattern; +import mvm.rya.rdftriplestore.utils.TransitivePropertySP; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * Represents a set of {@link CopyRule} instances derived from a query. The ruleset determines a logical + * subset of statements in Rya, such that statements selected by the ruleset are at least enough to answer + * the query. + */ +public class QueryRuleset { + private static final Logger log = Logger.getLogger(QueryRuleset.class); + + /** + * Represents an error attempting to convert a query to a set of rules. + */ + public static class QueryRulesetException extends Exception { + private static final long serialVersionUID = 1L; + public QueryRulesetException(String s) { + super(s); + } + public QueryRulesetException(String s, Throwable throwable) { + super(s, throwable); + } + } + + /** + * Takes in a parsed query tree and extracts the rules defining relevant statements. + */ + private static class RulesetVisitor extends QueryModelVisitorBase<QueryRulesetException> { + List<CopyRule> rules = new LinkedList<>(); + private Set<Value> superclasses = new HashSet<>(); + private Set<Value> superproperties = new HashSet<>(); + private Set<Value> sameAs = new HashSet<>(); + private Set<Value> transitive = new HashSet<>(); + private Set<Value> schemaProperties = new HashSet<>(); + + @Override + public void meet(StatementPattern node) throws QueryRulesetException { + Var predVar = node.getPredicateVar(); + // If this is a transitive property node, just match all statements with that property + if (node instanceof TransitivePropertySP && predVar.hasValue()) { + node = new StatementPattern(new Var("transitiveSubject"), predVar, + new Var("transitiveObject"), node.getContextVar()); + // And make sure to grab the transitivity statement itself + transitive.add(predVar.getValue()); + } + rules.add(new CopyRule(node)); + } + + @Override + public void meet(Filter node) throws QueryRulesetException { + ValueExpr condition = node.getCondition(); + // If the condition is a function call, and we don't know about the function, don't try to test for it. + if (condition instanceof FunctionCall) { + String uri = ((FunctionCall) condition).getURI(); + if (FunctionRegistry.getInstance().get(uri) == null) { + // Just extract statement patterns from the child as if there were no filter. + node.getArg().visit(this); + } + } + // Otherwise, assume we can test for it: extract rules from below this node, and add the condition to each one. + else { + RulesetVisitor childVisitor = new RulesetVisitor(); + node.getArg().visit(childVisitor); + for (CopyRule rule : childVisitor.rules) { + rule.addCondition(condition); + this.rules.add(rule); + } + this.superclasses.addAll(childVisitor.superclasses); + this.superproperties.addAll(childVisitor.superproperties); + } + } + + @Override + public void meet(Join node) throws QueryRulesetException { + TupleExpr left = node.getLeftArg(); + TupleExpr right = node.getRightArg(); + // If this join represents the application of inference logic, use its children to add the + // appropriate rules. + if (node instanceof InferJoin && left instanceof FixedStatementPattern) { + FixedStatementPattern fsp = (FixedStatementPattern) left; + Value predValue = fsp.getPredicateVar().getValue(); + // If this is a subClassOf relation, fetch all subClassOf and equivalentClass + // relations involving the relevant classes. + if (RDFS.SUBCLASSOF.equals(predValue) && right instanceof StatementPattern) { + StatementPattern dne = (StatementPattern) right; + // If a subClassOf b equivalentClass c subClassOf d, then fsp will contain a statement + // for each class in the hierarchy. If we match every subClassOf and equivalentClass + // relation to any of {a,b,c,d}, then the hierarchy can be reconstructed. + for (Statement st : fsp.statements) { + Value superclassVal = st.getSubject(); + // Rule to match the type assignment: + rules.add(new CopyRule(new StatementPattern(dne.getSubjectVar(), + dne.getPredicateVar(), + new Var(superclassVal.toString(), superclassVal), + dne.getContextVar()))); + // Add to the set of classes for which we need the hierarchy: + superclasses.add(superclassVal); + } + } + // If this is a subPropertyOf relation, fetch all subPropertyOf and equivalentProperty + // relations involving the relevant properties. + else if (RDFS.SUBPROPERTYOF.equals(predValue) && right instanceof StatementPattern) { + StatementPattern dne = (StatementPattern) right; + // If p subPropertyOf q subPropertyOf r subPropertyOf s, then fsp will contain a statement + // for each property in the hierarchy. If we match every subPropertyOf and equivalentProperty + // relation to any of {p,q,r,s}, then the hierarchy can be reconstructed. + for (Statement st : fsp.statements) { + Value superpropVal = st.getSubject(); + // Rule to add the property: + rules.add(new CopyRule(new StatementPattern(dne.getSubjectVar(), + new Var(superpropVal.toString(), superpropVal), + dne.getObjectVar(), + dne.getContextVar()))); + // Add to the set of properties for which we need the hierarchy: + superproperties.add(superpropVal); + } + } + // If this is a sameAs expansion, it may have one or two levels + if (OWL.SAMEAS.equals(predValue)) { + StatementPattern stmt = null; + String replaceVar = fsp.getSubjectVar().getName(); + String replaceVarInner = null; + List<Value> replacements = new LinkedList<>(); + List<Value> replacementsInner = new LinkedList<>(); + for (Statement st : fsp.statements) { + replacements.add(st.getSubject()); + } + if (right instanceof StatementPattern) { + stmt = (StatementPattern) right; + } + else if (right instanceof InferJoin) { + // Add the second set of replacements if given + InferJoin inner = (InferJoin) right; + if (inner.getLeftArg() instanceof FixedStatementPattern + && inner.getRightArg() instanceof StatementPattern) { + stmt = (StatementPattern) inner.getRightArg(); + fsp = (FixedStatementPattern) inner.getLeftArg(); + replaceVarInner = fsp.getSubjectVar().getName(); + for (Statement st : fsp.statements) { + replacementsInner.add(st.getSubject()); + } + } + } + // Add different versions of the original statement: + if (stmt != null) { + for (Value replacementVal : replacements) { + if (replacementsInner.isEmpty()) { + StatementPattern transformed = stmt.clone(); + if (transformed.getSubjectVar().equals(replaceVar)) { + transformed.setSubjectVar(new Var(replaceVar, replacementVal)); + } + if (transformed.getObjectVar().equals(replaceVar)) { + transformed.setObjectVar(new Var(replaceVar, replacementVal)); + } + rules.add(new CopyRule(transformed)); + } + for (Value replacementValInner : replacementsInner) { + StatementPattern transformed = stmt.clone(); + if (transformed.getSubjectVar().equals(replaceVar)) { + transformed.setSubjectVar(new Var(replaceVar, replacementVal)); + } + else if (transformed.getSubjectVar().equals(replaceVarInner)) { + transformed.setSubjectVar(new Var(replaceVarInner, replacementValInner)); + } + if (transformed.getObjectVar().equals(replaceVar)) { + transformed.setObjectVar(new Var(replaceVar, replacementVal)); + } + else if (transformed.getObjectVar().equals(replaceVarInner)) { + transformed.setObjectVar(new Var(replaceVar, replacementValInner)); + } + rules.add(new CopyRule(transformed)); + } + } + } + // Add to the set of resources for which we need sameAs relations: + sameAs.addAll(replacements); + sameAs.addAll(replacementsInner); + } + } + // If it's a normal join, visit the children. + else { + super.meet(node); + } + } + + @Override + public void meet(Union node) throws QueryRulesetException { + node.visitChildren(this); + if (node instanceof InferUnion) { + // If this is the result of inference, search each tree for (non-standard) properties and add them + // to the set of properties for which to include schema information. + QueryModelVisitorBase<QueryRulesetException> propertyVisitor = new QueryModelVisitorBase<QueryRulesetException>() { + @Override + public void meet(StatementPattern node) { + if (node.getPredicateVar().hasValue()) { + URI predValue = (URI) node.getPredicateVar().getValue(); + String ns = predValue.getNamespace(); + if (node instanceof FixedStatementPattern + && (RDFS.SUBPROPERTYOF.equals(predValue) || OWL.EQUIVALENTPROPERTY.equals(predValue))) { + // This FSP replaced a property, so find all the properties it entails + FixedStatementPattern fsp = (FixedStatementPattern) node; + for (Statement stmt : fsp.statements) { + schemaProperties.add(stmt.getSubject()); + } + } + else if (!(OWL.NAMESPACE.equals(ns) || RDFS.NAMESPACE.equals(ns) || RDF.NAMESPACE.equals(ns))) { + // This is a regular triple pattern; grab its predicate + schemaProperties.add(predValue); + } + } + } + }; + node.getLeftArg().visit(propertyVisitor); + node.getRightArg().visit(propertyVisitor); + } + } + + /** + * Add rules covering the portions of the schema that may be necessary to use inference + * with this query. + */ + public void addSchema() throws QueryRulesetException { + // Combine the relevant portions of the class hierarchy into one subclass rule and one equivalent class rule: + if (!superclasses.isEmpty()) { + Var superClassVar = new Var("superClassVar"); + // Subclasses of the given classes: + addListRule(new Var("subClassVar"), null, RDFS.SUBCLASSOF, superClassVar, superclasses); + // Equivalent classes to the given classes (this might be stated in either direction): + addListRule(new Var("eqClassSubject"), superclasses, OWL.EQUIVALENTCLASS, new Var("eqClassObject"), superclasses); + } + + // Combine the relevant portions of the property hierarchy into one subproperty rule and one equivalent property rule: + if (!superproperties.isEmpty()) { + Var superPropertyVar = new Var("superPropertyVar"); + // Subproperties of the given properties: + addListRule(new Var("subPropertyVar"), null, RDFS.SUBPROPERTYOF, superPropertyVar, superproperties); + // Equivalent properties to the given properties (this might be stated in either direction): + addListRule(new Var("eqPropSubject"), superproperties, OWL.EQUIVALENTPROPERTY, new Var("eqPropObject"), superproperties); + } + + // Get the relevant portions of the owl:sameAs graph + if (!sameAs.isEmpty()) { + Var sameAsSubj = new Var("sameAsSubject"); + Var sameAsObj = new Var("sameAsObject"); + addListRule(sameAsSubj, sameAs, OWL.SAMEAS, sameAsObj, sameAs); + } + + // Get the potentially relevant owl:TransitiveProperty statements + if (!transitive.isEmpty()) { + Var transitiveVar = new Var(OWL.TRANSITIVEPROPERTY.toString(), OWL.TRANSITIVEPROPERTY); + addListRule(new Var("transitiveProp"), transitive, RDF.TYPE, transitiveVar, null); + } + + // Get any owl:SymmetricProperty and owl:inverseOf statements for relevant properties + if (!schemaProperties.isEmpty()) { + Var symmetricVar = new Var(OWL.SYMMETRICPROPERTY.toString(), OWL.SYMMETRICPROPERTY); + addListRule(new Var("symmetricProp"), schemaProperties, RDF.TYPE, symmetricVar, null); + addListRule(new Var("inverseSubject"), schemaProperties, OWL.INVERSEOF, new Var("inverseObject"), schemaProperties); + } + } + + /** + * Build and add a rule that matches triples having a specific predicate, where subject and object constraints + * are each defined using a Var and a set of Values, and each can represent one of: a constant value + * (Var has a value), an enumerated set of possible values, to be turned into a filter (Var has no + * Value and set of Values is non-null), or an unconstrained variable (Var has no value and set of + * Values is null). If both subject and object are variables with enumerated sets, only one part needs to + * match in order to accept the triple. + * @param subjVar Var corresponding to the subject. May have a specific value or represent a variable. + * @param subjValues Either null or a Set of Values that the subject variable can have, tested using a filter. + * @param predicate The URI for the predicate to match + * @param objVar Var corresponding to the object. May have a specific value or represent a variable. + * @param objValues Either null or a Set of Values that the object variable can have, tested using a filter + * @throws QueryRulesetException if the rule can't be created + */ + private void addListRule(Var subjVar, Set<Value> subjValues, URI predicate, + Var objVar, Set<Value> objValues) throws QueryRulesetException { + ListMemberOperator subjCondition = null; + ListMemberOperator objCondition = null; + if (subjValues != null) { + subjCondition = new ListMemberOperator(); + subjCondition.addArgument(subjVar); + for (Value constant : subjValues) { + subjCondition.addArgument(new Var(constant.toString(), constant)); + } + } + if (objValues != null) { + objCondition = new ListMemberOperator(); + objCondition.addArgument(objVar); + for (Value constant : objValues) { + objCondition.addArgument(new Var(constant.toString(), constant)); + } + } + Var predVar = new Var(predicate.toString(), predicate); + CopyRule listRule = new CopyRule(new StatementPattern(subjVar, predVar, objVar)); + if (subjCondition != null && objCondition != null) { + listRule.addCondition(new Or(subjCondition, objCondition)); + } + else if (subjCondition != null) { + listRule.addCondition(subjCondition); + } + else if (objCondition != null) { + listRule.addCondition(objCondition); + } + rules.add(listRule); + } + } + + /** + * The rules themselves -- any statement satisfying any of these rules will be copied. + */ + protected Set<CopyRule> rules = new HashSet<>(); + + /** + * The SPARQL query that defines the ruleset. + */ + protected String query; + + /** + * A Rya configuration. + */ + protected RdfCloudTripleStoreConfiguration conf; + + /** + * Extract a set of rules from a query found in a Configuration. + * @param conf Configuration containing either the query string, or name of a file containing the query, plus inference parameters. + * @throws QueryRulesetException if the query can't be read, parsed, and resolved to valid rules + */ + public QueryRuleset(RdfCloudTripleStoreConfiguration conf) throws QueryRulesetException { + this.conf = conf; + setQuery(); + setRules(); + } + + /** + * Extract a set of rules from a query. + * @param query A SPARQL query string + * @throws QueryRulesetException if the query can't be parsed and resolved to valid rules + */ + public QueryRuleset(String query) throws QueryRulesetException { + this.query = query; + setRules(); + } + + /** + * Get the query that was used to construct this ruleset. + * @return A SPARQL query + */ + public String getQuery() { + return query; + } + + /** + * Set this ruleset's defining query based on the configuration. Query can be + * specified directly or using a file; if it's read from a file, the query + * text will also be added to the configuration. + * @return SPARQL query + * @throws QueryRulesetException if there is no configuration, or if the query can't be found or read + */ + private void setQuery() throws QueryRulesetException { + if (conf == null) { + throw new QueryRulesetException("No Configuration given"); + } + query = conf.get(CopyTool.QUERY_STRING_PROP); + String queryFile = conf.get(CopyTool.QUERY_FILE_PROP); + if (query == null && queryFile != null) { + try { + FileReader fileReader = new FileReader(queryFile); + BufferedReader reader = new BufferedReader(fileReader); + StringBuilder builder = new StringBuilder(); + String line = reader.readLine(); + while (line != null) { + builder.append(line).append("\n"); + line = reader.readLine(); + } + query = builder.toString(); + reader.close(); + conf.set(CopyTool.QUERY_STRING_PROP, query); + } + catch (IOException e) { + throw new QueryRulesetException("Error loading query from file: " + queryFile, e); + } + } + else if (query == null) { + throw new QueryRulesetException("No query string or query file provided"); + } + } + + /** + * Extract the rules from the query string, applying inference rules if configured to. + * @throws QueryRulesetException if the parsed query can't be parsed and translated into valid rules. + */ + private void setRules() throws QueryRulesetException { + final ParsedTupleQuery ptq; + final TupleExpr te; + try { + ptq = QueryParserUtil.parseTupleQuery(QueryLanguage.SPARQL, query, null); + } + catch (UnsupportedQueryLanguageException | MalformedQueryException e) { + throw new QueryRulesetException("Error parsing query:\n" + query, e); + } + te = ptq.getTupleExpr(); + // Before converting to rules (and renaming variables), validate that no statement patterns + // consist of only variables (this would result in a rule that matches every triple). + // Needs to be done before inference, since inference rules may create such statement patterns + // that are OK because they won'd be converted to rules directly. + te.visit(new QueryModelVisitorBase<QueryRulesetException>() { + @Override + public void meet(StatementPattern node) throws QueryRulesetException { + if (!(node.getSubjectVar().hasValue() || node.getPredicateVar().hasValue() || node.getObjectVar().hasValue())) { + throw new QueryRulesetException("Statement pattern with no constants would match every statement:\n" + + node + "\nFrom parsed query:\n" + te); + } + } + }); + // Apply inference, if applicable + if (conf != null && conf.isInfer()) { + RdfCloudTripleStore store = null; + try { + log.info("Applying inference rules"); + store = (RdfCloudTripleStore) RyaSailFactory.getInstance(conf); + InferenceEngine inferenceEngine = store.getInferenceEngine(); + // Apply in same order as query evaluation: + te.visit(new TransitivePropertyVisitor(conf, inferenceEngine)); + te.visit(new SymmetricPropertyVisitor(conf, inferenceEngine)); + te.visit(new InverseOfVisitor(conf, inferenceEngine)); + te.visit(new SubPropertyOfVisitor(conf, inferenceEngine)); + te.visit(new SubClassOfVisitor(conf, inferenceEngine)); + te.visit(new SameAsVisitor(conf, inferenceEngine)); + log.info("Query after inference:\n"); + for (String line : te.toString().split("\n")) { + log.info("\t" + line); + } + } + catch (Exception e) { + throw new QueryRulesetException("Error applying inference to parsed query:\n" + te, e); + } + finally { + if (store != null) { + try { + store.shutDown(); + } catch (SailException e) { + log.error("Error shutting down Sail after applying inference", e); + } + } + } + } + // Extract the StatementPatterns and Filters and turn them into rules: + RulesetVisitor rv = new RulesetVisitor(); + try { + te.visit(rv); + rv.addSchema(); + } + catch (QueryRulesetException e) { + throw new QueryRulesetException("Error extracting rules from parsed query:\n" + te, e); + } + for (CopyRule candidateRule : rv.rules) { + boolean unique = true; + for (CopyRule otherRule : rv.rules) { + if (!candidateRule.equals(otherRule) && otherRule.isGeneralizationOf(candidateRule)) { + unique = false; + break; + } + } + if (unique) { + this.rules.add(candidateRule); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("Original Query:\n\n\t"); + sb.append(query.replace("\n", "\n\t")).append("\n\nRuleset:\n"); + for (CopyRule rule : rules) { + sb.append("\n\t").append(rule.toString().replace("\n", "\n\t")).append("\n"); + } + return sb.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/TimeUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/TimeUtils.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/TimeUtils.java new file mode 100644 index 0000000..8a9d3d2 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/TimeUtils.java @@ -0,0 +1,349 @@ +package mvm.rya.accumulo.mr.merge.util; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.UnknownHostException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.net.ntp.NTPUDPClient; +import org.apache.commons.net.ntp.TimeInfo; +import org.codehaus.plexus.util.StringUtils; +import org.mortbay.jetty.HttpMethods; + +import com.google.common.net.HttpHeaders; + +import twitter4j.Logger; + +/** + * Utility methods for time. + */ +public final class TimeUtils { + private static final Logger log = Logger.getLogger(TimeUtils.class); + + /** + * The default host name of the time server to use. + * List of time servers: http://tf.nist.gov/tf-cgi/servers.cgi + * Do not query time server more than once every 4 seconds. + */ + public static final String DEFAULT_TIME_SERVER_HOST = "time.nist.gov"; + + private static final int NTP_SERVER_TIMEOUT_MS = 15000; + + /** + * Queries the default NTP Server for the time. + * Do not query time server more than once every 4 seconds. + * @return the NTP server {@link Date} or {@code null}. + * @throws IOException + */ + public static Date getDefaultNtpServerDate() throws IOException { + return getNtpServerDate(DEFAULT_TIME_SERVER_HOST); + } + + /** + * Queries the specified NTP Server for the time. + * Do not query time server more than once every 4 seconds. + * @param timeServerHost the time server host name. + * @return the NTP server {@link Date} or {@code null}. + * @throws IOException + */ + public static Date getNtpServerDate(String timeServerHost) throws IOException { + try { + TimeInfo timeInfo = null; + NTPUDPClient timeClient = new NTPUDPClient(); + timeClient.setDefaultTimeout(NTP_SERVER_TIMEOUT_MS); + InetAddress inetAddress = InetAddress.getByName(timeServerHost); + if (inetAddress != null) { + timeInfo = timeClient.getTime(inetAddress); + if (timeInfo != null) { + // TODO: which time to use? + long serverTime = timeInfo.getMessage().getTransmitTimeStamp().getTime(); + //long serverTime = timeInfo.getReturnTime(); + Date ntpDate = new Date(serverTime); + return ntpDate; + } + } + } catch (IOException e) { + throw new IOException("Unable to get NTP server time.", e); + } + return null; + } + + /** + * Gets the remote machine's system time by checking the DATE field in the header + * from a HTTP HEAD method response. + * @param urlString the URL string of the remote machine's web server to connect to. + * @return the remote machine's system {@link Date} or {@code null}. + * @throws IOException + * @throws ParseException + */ + public static Date getRemoteMachineDate(String urlString) throws IOException, ParseException { + Date remoteDate = null; + HttpURLConnection conn = null; + try { + URL url = new URL(urlString); + + // Set up the initial connection + conn = (HttpURLConnection)url.openConnection(); + // Use HEAD instead of GET so content isn't returned. + conn.setRequestMethod(HttpMethods.HEAD); + conn.setDoOutput(false); + conn.setReadTimeout(10000); + + conn.connect(); + + Map<String, List<String>> header = conn.getHeaderFields(); + for (String key : header.keySet()) { + if (key != null && HttpHeaders.DATE.equals(key)) { + List<String> data = header.get(key); + String dateString = data.get(0); + SimpleDateFormat sdf = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss Z"); + remoteDate = sdf.parse(dateString); + break; + } + } + } finally { + // Close the connection + conn.disconnect(); + } + + return remoteDate; + } + + /** + * Gets the time difference between the 2 specified times from the NTP server time and the machine system time. + * @param ntpDate the {@link Date} from the NTP server host. + * @param machineDate the {@link Date} from the machine (either local or remote). + * @param isMachineLocal {@code true} if the {@code machineDate} from a local machine. {@code false} + * if it's from a remote machine. + * @return the difference between the NTP server time and the machine's system time. A positive value + * indicates that the machine's system time is ahead of the time server. A negative value indicates that + * the machine's system time is behind of the time server. + */ + public static Long getTimeDifference(Date ntpDate, Date machineDate, boolean isMachineLocal) { + Long diff = null; + if (ntpDate != null && machineDate != null) { + log.info("NTP Server Time: " + ntpDate); + String machineLabel = isMachineLocal ? "Local" : "Remote"; + log.info(machineLabel + " Machine Time: " + machineDate); + diff = machineDate.getTime() - ntpDate.getTime(); + + boolean isAhead = diff > 0; + String durationBreakdown = TimeUtils.getDurationBreakdown(diff, false); + log.info(machineLabel + " Machine time is " + (isAhead ? "ahead of" : "behind") + " NTP server time by " + durationBreakdown + "."); + } + + return diff; + } + + /** + * Gets the time difference between the NTP server and the local machine system time. + * @param timeServerHost the time server host name. + * @return the difference between the NTP server time and the local machine's system time. A positive value + * indicates that the local machine's system time is ahead of the time server. A negative value indicates that + * the local machine's system time is behind of the time server. + * @throws IOException + */ + public static Long getNtpServerAndLocalMachineTimeDifference(String timeServerHost) throws IOException { + log.info("Getting NTP Server time from " + timeServerHost + "..."); + Date ntpDate = getNtpServerDate(timeServerHost); + Long diff = null; + if (ntpDate != null) { + log.info("Getting Local Machine time..."); + Date machineDate = new Date(); + + diff = getTimeDifference(ntpDate, machineDate, true); + } + + return diff; + } + + /** + * Gets the time difference between the NTP server and the remote machine system time. + * @param timeServerHost the time server host name. + * @param remoteMachineUrlString the URL string of the remote machine's web server to connect to. + * @return the difference between the NTP server time and the remote machine's system time. A positive value + * indicates that the remote machine's system time is ahead the time server. A negative value indicates that + * the remote machine's system time is behind the time server. + * @throws ParseException + * @throws IOException + */ + public static Long getNtpServerAndRemoteMachineTimeDifference(String timeServerHost, String remoteMachineUrlString) throws IOException, ParseException { + log.info("Getting NTP Server time from " + timeServerHost + "..."); + Date ntpDate = getNtpServerDate(timeServerHost); + Long diff = null; + if (ntpDate != null) { + log.info("Getting Remote Machine time from " + remoteMachineUrlString + "..."); + Date machineDate = getRemoteMachineDate(remoteMachineUrlString); + + diff = getTimeDifference(ntpDate, machineDate, false); + } + + return diff; + } + + /** + * Gets the time difference between the NTP server and the machine system time (either locally or remotely depending on the URL). + * @param timeServerHost the time server host name. + * @param machineUrlString the URL string of the machine's web server to connect to. The machine might be + * local or remote. + * @return the difference between the NTP server time and the machine's system time. A positive value + * indicates that the machine's system time is ahead of the time server. A negative value indicates that + * the machine's system time is behind the time server. + * @throws ParseException + * @throws IOException + */ + public static Long getNtpServerAndMachineTimeDifference(String timeServerHost, String machineUrlString) throws IOException, ParseException { + boolean isUrlLocalMachine = isUrlLocalMachine(machineUrlString); + + Long machineTimeOffset; + if (isUrlLocalMachine) { + machineTimeOffset = getNtpServerAndLocalMachineTimeDifference(timeServerHost); + } else { + machineTimeOffset = getNtpServerAndRemoteMachineTimeDifference(timeServerHost, machineUrlString); + } + + return machineTimeOffset; + } + + /** + * Gets the machine system time (either locally or remotely depending on the URL). + * @param urlString the URL string of the machine to check. + * @return the machine's system time. + * @throws IOException + * @throws ParseException + */ + public static Date getMachineDate(String urlString) throws IOException, ParseException { + boolean isMachineLocal = isUrlLocalMachine(urlString); + + Date machineDate; + if (isMachineLocal) { + // Get local system machine time + machineDate = new Date(); + } else { + // Get remote machine time from HTTP HEAD response. Check hosted server web page on machine for time. + machineDate = getRemoteMachineDate(urlString); + } + + return machineDate; + } + + /** + * Checks to see if the URL provided is hosted on the local machine or not. + * @param urlString the URL string to check. + * @return {@code true} if the URL is hosted on the local machine. {@code false} + * if it's on a remote machine. + * @throws UnknownHostException + * @throws MalformedURLException + */ + public static boolean isUrlLocalMachine(String urlString) throws UnknownHostException, MalformedURLException { + String localAddress = InetAddress.getLocalHost().getHostAddress(); + String requestAddress = InetAddress.getByName(new URL(urlString).getHost()).getHostAddress(); + return localAddress != null && requestAddress != null && localAddress.equals(requestAddress); + } + + /** + * Convert a millisecond duration to a string format. + * @param durationMs A duration to convert to a string form. + * @return A string of the form "X Days Y Hours Z Minutes A Seconds B Milliseconds". + */ + public static String getDurationBreakdown(final long durationMs) { + return getDurationBreakdown(durationMs, true); + } + + /** + * Convert a millisecond duration to a string format. + * @param durationMs A duration to convert to a string form. + * @param showSign {@code true} to show if the duration is positive or negative. {@code false} + * to not display the sign. + * @return A string of the form "X Days Y Hours Z Minutes A Seconds B Milliseconds". + */ + public static String getDurationBreakdown(final long durationMs, boolean showSign) { + long tempDurationMs = Math.abs(durationMs); + + long days = TimeUnit.MILLISECONDS.toDays(tempDurationMs); + tempDurationMs -= TimeUnit.DAYS.toMillis(days); + long hours = TimeUnit.MILLISECONDS.toHours(tempDurationMs); + tempDurationMs -= TimeUnit.HOURS.toMillis(hours); + long minutes = TimeUnit.MILLISECONDS.toMinutes(tempDurationMs); + tempDurationMs -= TimeUnit.MINUTES.toMillis(minutes); + long seconds = TimeUnit.MILLISECONDS.toSeconds(tempDurationMs); + tempDurationMs -= TimeUnit.SECONDS.toMillis(seconds); + long milliseconds = TimeUnit.MILLISECONDS.toMillis(tempDurationMs); + + StringBuilder sb = new StringBuilder(); + if (tempDurationMs != 0 && showSign) { + sb.append(tempDurationMs > 0 ? "+" : "-"); + } + if (days > 0) { + sb.append(days); + sb.append(days == 1 ? " Day " : " Days "); + } + if (hours > 0) { + sb.append(hours); + sb.append(hours == 1 ? " Hour " : " Hours "); + } + if (minutes > 0) { + sb.append(minutes); + sb.append(minutes == 1 ? " Minute " : " Minutes "); + } + if (seconds > 0) { + sb.append(seconds); + sb.append(seconds == 1 ? " Second " : " Seconds " ); + } + if (milliseconds > 0 || (!showSign && sb.length() == 0) || (showSign && sb.length() == 1)) { + // At least show the milliseconds if nothing else has been shown so far + sb.append(milliseconds); + sb.append(milliseconds == 1 ? " Millisecond" : " Milliseconds"); + } + + return StringUtils.trim(sb.toString()); + } + + /** + * Checks if a date is before another date or if they are equal. + * @param date1 the first {@link Date}. + * @param date2 the second {@link Date}. + * @return {@code true} if {@code date1} is before or equal to {@code date2}. {@code false} otherwise. + */ + public static boolean dateBeforeInclusive(Date date1, Date date2) { + return !date1.after(date2); + } + + /** + * Checks if a date is after another date or if they are equal. + * @param date1 the first {@link Date}. + * @param date2 the second {@link Date}. + * @return {@code true} if {@code date1} is after or equal to {@code date2}. {@code false} otherwise. + */ + public static boolean dateAfterInclusive(Date date1, Date date2) { + return !date1.before(date2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/ToolConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/ToolConfigUtils.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/ToolConfigUtils.java new file mode 100644 index 0000000..4e06166 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/ToolConfigUtils.java @@ -0,0 +1,136 @@ +package mvm.rya.accumulo.mr.merge.util; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.XMLConfiguration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.log4j.Logger; + +/** + * Utility methods for the merge tool and copy tool configuration files. + */ +public final class ToolConfigUtils { + private static final Logger log = Logger.getLogger(ToolConfigUtils.class); + + /** + * Private constructor to prevent instantiation. + */ + private ToolConfigUtils() { + } + + /** + * Gets the set of user arguments from the user's config and/or their extra supplied + * command line arguments. This weeds out all the automatically generated parameters created + * from initializing a {@link Configuration} object and should only give back a set of arguments + * provided directly by the user. + * @param conf the {@link Configuration} provided. + * @param args the extra arguments from the command line. + * @return a {@link Set} of argument strings. + * @throws IOException + */ + public static Set<String> getUserArguments(Configuration conf, String[] args) throws IOException { + String[] filteredArgs = new String[] {}; + if (Arrays.asList(args).contains("-conf")) { + // parse args + new GenericOptionsParser(conf, args); + + List<String> commandLineArgs = new ArrayList<>(); + for (String arg : args) { + if (arg.startsWith("-D")) { + commandLineArgs.add(arg); + } + } + filteredArgs = commandLineArgs.toArray(new String[0]); + } else { + filteredArgs = args; + } + + // Get the supplied config name from the resource string. + // No real easy way of getting the name. + // So, pulling it off the list of resource names in the Configuration's toString() method + // where it should be the last one. + String confString = conf.toString(); + String resourceString = StringUtils.removeStart(confString, "Configuration: "); + List<String> resourceNames = Arrays.asList(StringUtils.split(resourceString, ", ")); + String configFilename = resourceNames.get(resourceNames.size() - 1); + + Set<String> toolArgsSet = new HashSet<>(); + File file = new File(configFilename); + // Check that the last resource name is the actual user's config by seeing if it's a file + // on the system, the other resources seem to be contained in jars and so should fail here which + // should happen if no config is supplied. + if (file.exists()) { + XMLConfiguration configuration = null; + try { + configuration = new XMLConfiguration(configFilename); + toolArgsSet.addAll(getConfigArguments(configuration)); + } catch (ConfigurationException e) { + log.error("Unable to load configuration file.", e); + } + } + + toolArgsSet.addAll(Arrays.asList(filteredArgs)); + return Collections.unmodifiableSet(toolArgsSet); + } + + /** + * Reads in the configuration file properties and values and converts them + * into a set of argument strings. + * @param configuration the {@link XMLConfiguration}. + * @return the set of argument strings. + */ + public static Set<String> getConfigArguments(XMLConfiguration configuration) { + int size = configuration.getList("property.name").size(); + TreeSet<String> configArgs = new TreeSet<>(); + for (int i = 0; i < size; i++) { + String propertyName = configuration.getString("property(" + i + ").name"); + String propertyValue = configuration.getString("property(" + i + ").value"); + String argument = makeArgument(propertyName, propertyValue); + configArgs.add(argument); + } + return configArgs; + } + + /** + * Creates an argument string from the specified property name and value. + * If the property name is "config.file" and value is "config.xml" then this will + * create an argument string of "-Dconfig.file=config.xml" + * @param propertyName the property name. + * @param value the value. + * @return the argument string. + */ + public static String makeArgument(String propertyName, String value) { + return "-D" + propertyName + "=" + value; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/CopyToolTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/CopyToolTest.java b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/CopyToolTest.java new file mode 100644 index 0000000..0acfbd1 --- /dev/null +++ b/extras/rya.merger/src/test/java/mvm/rya/accumulo/mr/merge/CopyToolTest.java @@ -0,0 +1,339 @@ +package mvm.rya.accumulo.mr.merge; + +import static mvm.rya.accumulo.mr.merge.util.TestUtils.LAST_MONTH; +import static mvm.rya.accumulo.mr.merge.util.TestUtils.TODAY; +import static mvm.rya.accumulo.mr.merge.util.TestUtils.YESTERDAY; +import static mvm.rya.accumulo.mr.merge.util.TestUtils.createRyaStatement; +import static mvm.rya.accumulo.mr.merge.util.ToolConfigUtils.makeArgument; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.file.rfile.bcfile.Compression.Algorithm; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.merge.common.InstanceType; +import mvm.rya.accumulo.mr.merge.driver.AccumuloDualInstanceDriver; +import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils; +import mvm.rya.accumulo.mr.merge.util.TestUtils; +import mvm.rya.accumulo.mr.merge.util.TimeUtils; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.indexing.accumulo.ConfigUtils; + +/** + * Tests for {@link CopyTool}. + */ +public class CopyToolTest { + private static final Logger log = Logger.getLogger(CopyToolTest.class); + + private static final boolean IS_MOCK = true; + private static final boolean USE_TIME_SYNC = false; + private static final boolean USE_COPY_FILE_OUTPUT = false; + private static final boolean IS_START_TIME_DIALOG_ENABLED = false; + + private static final String CHILD_SUFFIX = MergeTool.CHILD_SUFFIX; + + private static final String PARENT_PASSWORD = AccumuloDualInstanceDriver.PARENT_PASSWORD; + private static final String PARENT_INSTANCE = AccumuloDualInstanceDriver.PARENT_INSTANCE; + private static final String PARENT_TABLE_PREFIX = AccumuloDualInstanceDriver.PARENT_TABLE_PREFIX; + private static final String PARENT_AUTH = AccumuloDualInstanceDriver.PARENT_AUTH; + private static final ColumnVisibility PARENT_COLUMN_VISIBILITY = new ColumnVisibility(PARENT_AUTH); + private static final String PARENT_TOMCAT_URL = "http://rya-example-box:8080"; + + private static final String CHILD_PASSWORD = AccumuloDualInstanceDriver.CHILD_PASSWORD; + private static final String CHILD_INSTANCE = AccumuloDualInstanceDriver.CHILD_INSTANCE; + private static final String CHILD_TABLE_PREFIX = AccumuloDualInstanceDriver.CHILD_TABLE_PREFIX; + private static final String CHILD_TOMCAT_URL = "http://localhost:8080"; + + private static AccumuloRyaDAO parentDao; + private static AccumuloRyaDAO childDao; + + private static AccumuloRdfConfiguration parentConfig; + private static AccumuloRdfConfiguration childConfig; + + private static AccumuloDualInstanceDriver accumuloDualInstanceDriver; + private static CopyTool copyTool = null; + private boolean isImporting = false; + + @BeforeClass + public static void setUp() throws Exception { + accumuloDualInstanceDriver = new AccumuloDualInstanceDriver(IS_MOCK, true, true, false, false); + accumuloDualInstanceDriver.setUpInstances(); + } + + @Before + public void setUpPerTest() throws Exception { + accumuloDualInstanceDriver.setUpTables(); + + accumuloDualInstanceDriver.setUpDaos(); + + accumuloDualInstanceDriver.setUpConfigs(); + + parentConfig = accumuloDualInstanceDriver.getParentConfig(); + childConfig = accumuloDualInstanceDriver.getChildConfig(); + parentDao = accumuloDualInstanceDriver.getParentDao(); + childDao = accumuloDualInstanceDriver.getChildDao(); + } + + @After + public void tearDownPerTest() throws Exception { + log.info("tearDownPerTest(): tearing down now."); + accumuloDualInstanceDriver.tearDownTables(); + accumuloDualInstanceDriver.tearDownDaos(); + if (copyTool != null) { + copyTool.shutdown(); + } + } + + @AfterClass + public static void tearDownPerClass() throws Exception { + log.info("tearDownPerClass(): tearing down now."); + accumuloDualInstanceDriver.tearDown(); + } + + private void assertStatementInChild(String description, int verifyResultCount, RyaStatement matchStatement) throws RyaDAOException { + TestUtils.assertStatementInInstance(description, verifyResultCount, matchStatement, childDao, childConfig); + } + + private void copyToolRun(Date startDate) throws AccumuloException, AccumuloSecurityException { + copyTool = new CopyTool(); + copyTool.setupAndRun(new String[] { + makeArgument(MRUtils.AC_MOCK_PROP, Boolean.toString(IS_MOCK)), + makeArgument(MRUtils.AC_INSTANCE_PROP, PARENT_INSTANCE), + makeArgument(MRUtils.AC_USERNAME_PROP, accumuloDualInstanceDriver.getParentUser()), + makeArgument(MRUtils.AC_PWD_PROP, PARENT_PASSWORD), + makeArgument(MRUtils.TABLE_PREFIX_PROPERTY, PARENT_TABLE_PREFIX), + makeArgument(MRUtils.AC_AUTH_PROP, accumuloDualInstanceDriver.getParentAuths().toString()), + makeArgument(MRUtils.AC_ZK_PROP, accumuloDualInstanceDriver.getParentZooKeepers()), + makeArgument(CopyTool.PARENT_TOMCAT_URL_PROP, PARENT_TOMCAT_URL), + makeArgument(MRUtils.AC_MOCK_PROP + CHILD_SUFFIX, Boolean.toString(IS_MOCK)), + makeArgument(MRUtils.AC_INSTANCE_PROP + CHILD_SUFFIX, CHILD_INSTANCE), + makeArgument(MRUtils.AC_USERNAME_PROP + CHILD_SUFFIX, accumuloDualInstanceDriver.getChildUser()), + makeArgument(MRUtils.AC_PWD_PROP + CHILD_SUFFIX, CHILD_PASSWORD), + makeArgument(MRUtils.TABLE_PREFIX_PROPERTY + CHILD_SUFFIX, CHILD_TABLE_PREFIX), + makeArgument(MRUtils.AC_AUTH_PROP + CHILD_SUFFIX, accumuloDualInstanceDriver.getChildAuths() != null ? accumuloDualInstanceDriver.getChildAuths().toString() : null), + makeArgument(MRUtils.AC_ZK_PROP + CHILD_SUFFIX, accumuloDualInstanceDriver.getChildZooKeepers() != null ? accumuloDualInstanceDriver.getChildZooKeepers() : "localhost"), + makeArgument(CopyTool.CHILD_TOMCAT_URL_PROP, CHILD_TOMCAT_URL), + makeArgument(CopyTool.CREATE_CHILD_INSTANCE_TYPE_PROP, (IS_MOCK ? InstanceType.MOCK : InstanceType.MINI).toString()), + makeArgument(CopyTool.NTP_SERVER_HOST_PROP, TimeUtils.DEFAULT_TIME_SERVER_HOST), + makeArgument(CopyTool.USE_NTP_SERVER_PROP, Boolean.toString(USE_TIME_SYNC)), + makeArgument(CopyTool.USE_COPY_FILE_OUTPUT, Boolean.toString(USE_COPY_FILE_OUTPUT)), + makeArgument(CopyTool.COPY_FILE_OUTPUT_PATH, "/test/copy_tool_file_output/"), + makeArgument(CopyTool.COPY_FILE_OUTPUT_COMPRESSION_TYPE, Algorithm.GZ.getName()), + makeArgument(CopyTool.USE_COPY_FILE_OUTPUT_DIRECTORY_CLEAR, Boolean.toString(true)), + makeArgument(CopyTool.COPY_FILE_IMPORT_DIRECTORY, "resources/test/copy_tool_file_output/"), + makeArgument(CopyTool.USE_COPY_FILE_IMPORT, Boolean.toString(isImporting)), + makeArgument(MergeTool.START_TIME_PROP, MergeTool.getStartTimeString(startDate, IS_START_TIME_DIALOG_ENABLED)) + }); + + Configuration toolConfig = copyTool.getConf(); + String zooKeepers = toolConfig.get(MRUtils.AC_ZK_PROP + CHILD_SUFFIX); + MergeTool.setDuplicateKeysForProperty(childConfig, MRUtils.AC_ZK_PROP, zooKeepers); + + log.info("Finished running tool."); + } + + @Test + public void testCopyTool() throws Exception { + RyaStatement ryaStatementOutOfTimeRange = createRyaStatement("coach", "called", "timeout", LAST_MONTH); + + RyaStatement ryaStatementShouldCopy1 = createRyaStatement("bob", "catches", "ball", YESTERDAY); + RyaStatement ryaStatementShouldCopy2 = createRyaStatement("bill", "talks to", "john", YESTERDAY); + RyaStatement ryaStatementShouldCopy3 = createRyaStatement("susan", "eats", "burgers", TODAY); + RyaStatement ryaStatementShouldCopy4 = createRyaStatement("ronnie", "plays", "guitar", TODAY); + + RyaStatement ryaStatementDoesNotExist1 = createRyaStatement("nobody", "was", "here", LAST_MONTH); + RyaStatement ryaStatementDoesNotExist2 = createRyaStatement("statement", "not", "found", YESTERDAY); + RyaStatement ryaStatementDoesNotExist3 = createRyaStatement("key", "does not", "exist", TODAY); + + // This statement was modified by the child to change the column visibility. + // The parent should combine the child's visibility with its visibility. + RyaStatement ryaStatementVisibilityDifferent = createRyaStatement("I", "see", "you", YESTERDAY); + ryaStatementVisibilityDifferent.setColumnVisibility(PARENT_COLUMN_VISIBILITY.getExpression()); + + // Setup initial parent instance with 7 rows + // This is the state of the parent data (as it is today) before merging occurs which will use the specified start time of yesterday. + parentDao.add(ryaStatementOutOfTimeRange); // Process should NOT copy statement + parentDao.add(ryaStatementShouldCopy1); // Process should copy statement + parentDao.add(ryaStatementShouldCopy2); // Process should copy statement + parentDao.add(ryaStatementShouldCopy3); // Process should copy statement + parentDao.add(ryaStatementShouldCopy4); // Process should copy statement + parentDao.add(ryaStatementVisibilityDifferent); // Process should copy and update statement + + + AccumuloRyaUtils.printTable(PARENT_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, parentConfig); + //AccumuloRyaUtils.printTable(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); + + log.info("Starting copy tool. Copying all data after the specified start time: " + YESTERDAY); + + isImporting = false; + copyToolRun(YESTERDAY); + + + // Copy Tool made child instance so hook the tables and dao into the driver. + String childUser = accumuloDualInstanceDriver.getChildUser(); + Connector childConnector = ConfigUtils.getConnector(childConfig); + accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setConnector(childConnector); + + accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpTables(); + + accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpDao(); + childDao = accumuloDualInstanceDriver.getChildDao(); + + + // Update child config to include changes made from copy process + SecurityOperations childSecOps = accumuloDualInstanceDriver.getChildSecOps(); + Authorizations newChildAuths = AccumuloRyaUtils.addUserAuths(childUser, childSecOps, PARENT_AUTH); + childSecOps.changeUserAuthorizations(childUser, newChildAuths); + String childAuthString = newChildAuths.toString(); + List<String> duplicateKeys = MergeTool.DUPLICATE_KEY_MAP.get(MRUtils.AC_AUTH_PROP); + childConfig.set(MRUtils.AC_AUTH_PROP, childAuthString); + for (String key : duplicateKeys) { + childConfig.set(key, childAuthString); + } + AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, childConfig); + AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, childConfig); + AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); + + Scanner scanner = AccumuloRyaUtils.getScanner(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); + Iterator<Entry<Key, Value>> iterator = scanner.iterator(); + int count = 0; + while (iterator.hasNext()) { + iterator.next(); + count++; + } + // Make sure we have all of them in the parent. + assertEquals(5, count); + + + assertStatementInChild("Child included statement that was out of time range", 0, ryaStatementOutOfTimeRange); + + assertStatementInChild("Child missing statement 1 that was in parent", 1, ryaStatementShouldCopy1); + + assertStatementInChild("Child missing statement 2 that was in parent", 1, ryaStatementShouldCopy2); + + assertStatementInChild("Child missing statement 3 that was in parent", 1, ryaStatementShouldCopy3); + + assertStatementInChild("Child missing statement 4 that was in parent", 1, ryaStatementShouldCopy4); + + assertStatementInChild("Child included statement 1 that was not in parent", 0, ryaStatementDoesNotExist1); + + assertStatementInChild("Child included statement 2 that was not in parent", 0, ryaStatementDoesNotExist2); + + assertStatementInChild("Child included statement 3 that was not in parent", 0, ryaStatementDoesNotExist3); + + // Check that it can be queried with child's visibility + assertStatementInChild("Child missing statement with child visibility", 1, ryaStatementVisibilityDifferent); + + // Check that it can be queried with parent's visibility + childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, PARENT_AUTH); + SecurityOperations secOps = IS_MOCK ? accumuloDualInstanceDriver.getChildSecOps() : childSecOps; + newChildAuths = AccumuloRyaUtils.addUserAuths(accumuloDualInstanceDriver.getChildUser(), secOps, PARENT_AUTH); + secOps.changeUserAuthorizations(accumuloDualInstanceDriver.getChildUser(), newChildAuths); + assertStatementInChild("Child missing statement with parent visibility", 1, ryaStatementVisibilityDifferent); + + // Check that it can NOT be queried with some other visibility + childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "bad_auth"); + CloseableIteration<RyaStatement, RyaDAOException> iter = childDao.getQueryEngine().query(ryaStatementVisibilityDifferent, childConfig); + count = 0; + try { + while (iter.hasNext()) { + iter.next(); + count++; + } + } catch (Exception e) { + // Expected + if (!(e.getCause() instanceof AccumuloSecurityException)) { + fail(); + } + } + iter.close(); + assertEquals(0, count); + + // reset auth + childConfig.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, childAuthString); + + log.info("DONE"); + } + + @Test + public void testImportDirectoryTool() throws Exception { + log.info(""); + log.info("Setting up initial state of parent before importing directory to child..."); + log.info("Adding data to parent..."); + + log.info("Starting import directory tool. Importing all data after the specified start time: " + YESTERDAY); + log.info(""); + + isImporting = true; + copyToolRun(YESTERDAY); + + + // Import Directory Tool made child instance so hook the tables and dao into the driver. + String childUser = accumuloDualInstanceDriver.getChildUser(); + Connector childConnector = ConfigUtils.getConnector(childConfig); + accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setConnector(childConnector); + + accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpTables(); + + accumuloDualInstanceDriver.getChildAccumuloInstanceDriver().setUpDao(); + + + // Update child config to include changes made from import directory process + SecurityOperations childSecOps = accumuloDualInstanceDriver.getChildSecOps(); + Authorizations newChildAuths = AccumuloRyaUtils.addUserAuths(childUser, childSecOps, PARENT_AUTH); + childSecOps.changeUserAuthorizations(childUser, newChildAuths); + String childAuthString = newChildAuths.toString(); + List<String> duplicateKeys = MergeTool.DUPLICATE_KEY_MAP.get(MRUtils.AC_AUTH_PROP); + childConfig.set(MRUtils.AC_AUTH_PROP, childAuthString); + for (String key : duplicateKeys) { + childConfig.set(key, childAuthString); + } + + + //AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, childConfig); + //AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, childConfig); + AccumuloRyaUtils.printTablePretty(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); + + Scanner scanner = AccumuloRyaUtils.getScanner(CHILD_TABLE_PREFIX + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, childConfig); + Iterator<Entry<Key, Value>> iterator = scanner.iterator(); + int count = 0; + while (iterator.hasNext()) { + iterator.next(); + count++; + } + log.info(""); + log.info("Total rows imported: " + count); + log.info(""); + + assertEquals(20, count); + + log.info("DONE"); + } +}
