http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java new file mode 100644 index 0000000..510fc75 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/Schema.java @@ -0,0 +1,538 @@ +package mvm.rya.reasoning; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.model.vocabulary.OWL; + +/** + * Hold on to facts about the schema (TBox/RBox) and perform what reasoning we + * can without instance data. + * <p> + * The Schema object, together with the OwlClass and OwlProperty objects it + * keeps track of, is responsible for schema reasoning, or the "Semantics of + * Schema Vocabulary" rules from the OWL RL/RDF specificiation. Some rules are + * handled dynamically, while the rest must be computed by calling closure() + * once the schema data has been read in. + * <p> + * Schema rules implemented in {@link OwlClass}: + * scm-cls, scm-eqc1, scm-eqc2, scm-sco, + * scm-hv, scm-svf2, scm-avf2 + * <p> + * Schema rules implemented in {@link OwlProperty}: + * scm-op, scm-dp, scm-eqp1, scm-eqp2, scm-spo, scm-dom1, scm-dom2, + * scm-rng1, scm-rng2, scm-svf1, scm-avf1 + * <p> + * TODO: scm-cls officially states owl:Nothing is a subclass of every class. + * Do we need to explicitly do something with this fact? + */ +public class Schema { + // Statements using these predicates are automatically relevant schema + // information. + private static final Set<URI> schemaPredicates = new HashSet<>(); + private static final URI[] schemaPredicateURIs = { + RDFS.SUBCLASSOF, + RDFS.SUBPROPERTYOF, + RDFS.DOMAIN, + RDFS.RANGE, + OWL.EQUIVALENTCLASS, + OWL.EQUIVALENTPROPERTY, + OWL.INVERSEOF, + OWL.DISJOINTWITH, + OWL.COMPLEMENTOF, + OWL.ONPROPERTY, + OWL.SOMEVALUESFROM, + OWL.ALLVALUESFROM, + OWL.HASVALUE, + OWL.MAXCARDINALITY, + OWL2.MAXQUALIFIEDCARDINALITY, + OWL2.PROPERTYDISJOINTWITH, + OWL2.ONCLASS + }; + + // The fact that something is one of these types is schema information. + private static final Set<Resource> schemaTypes = new HashSet<>(); + private static final Resource[] schemaTypeURIs = { + OWL.TRANSITIVEPROPERTY, + OWL2.IRREFLEXIVEPROPERTY, + OWL.SYMMETRICPROPERTY, + OWL2.ASYMMETRICPROPERTY, + OWL.FUNCTIONALPROPERTY, + OWL.INVERSEFUNCTIONALPROPERTY + }; + + static { + for (URI uri : schemaPredicateURIs) { + schemaPredicates.add(uri); + } + for (Resource uri : schemaTypeURIs) { + schemaTypes.add(uri); + } + } + + /** + * Does this triple/statement encode potentially relevant schema + * information? + */ + public static boolean isSchemaTriple(Statement triple) { + URI pred = triple.getPredicate(); + // Triples with certain predicates are schema triples, + if (schemaPredicates.contains(pred)) { + return true; + } + // And certain type assertions are schema triples. + else if (pred.equals(RDF.TYPE)) { + if (schemaTypes.contains(triple.getObject())) { + return true; + } + } + return false; + } + + /** + * Map URIs to schema information about a property + */ + protected Map<URI, OwlProperty> properties = new HashMap<>(); + + /** + * Map Resources to schema information about a class/restriction + */ + protected Map<Resource, OwlClass> classes = new HashMap<>(); + + /** + * Get schema information for a class, for reading and writing. + * Instantiates OwlClass if it doesn't yet exist. + */ + public OwlClass getClass(Resource c) { + if (!classes.containsKey(c)) { + classes.put(c, new OwlClass(c)); + } + return classes.get(c); + } + + /** + * Get schema information for a class, for reading and writing. + * Assumes this Value refers to a class Resource. + */ + public OwlClass getClass(Value c) { + return getClass((Resource) c); + } + + /** + * Get schema information for a property, for reading and writing. + * Instantiates OwlProperty if it doesn't yet exist. + */ + public OwlProperty getProperty(URI p) { + if (!properties.containsKey(p)) { + properties.put(p, new OwlProperty(p)); + } + return properties.get(p); + } + + /** + * Get schema information for a property, for reading and writing. + * Assumes this Value refers to a property URI. + */ + public OwlProperty getProperty(Value p) { + return getProperty((URI) p); + } + + /** + * Return whether this resource corresponds to a property. + */ + public boolean hasProperty(URI r) { + return properties.containsKey(r); + } + + /** + * Return whether this resource corresponds to a class. + */ + public boolean hasClass(Resource r) { + return classes.containsKey(r); + } + + /** + * Return whether this resource corresponds to a property restriction. + */ + public boolean hasRestriction(Resource r) { + return classes.containsKey(r) && !classes.get(r).getOnProperty().isEmpty(); + } + + public Schema() { + } + + /** + * Incorporate a new triple into the schema. + */ + public void processTriple(Statement triple) { + Resource s = triple.getSubject(); + URI p = triple.getPredicate(); + Value o = triple.getObject(); + if (isSchemaTriple(triple)) { + // For a type statement to be schema information, it must yield + // some boolean information about a property. + if (p.equals(RDF.TYPE)) { + if (schemaTypes.contains(o)) { + addPropertyType((URI) s, (Resource) o); + } + } + + // Domain/range + else if (p.equals(RDFS.DOMAIN)) { + // Don't add trivial domain owl:Thing + if (!o.equals(OWL.THING)) { + getProperty(s).addDomain(getClass(o)); + } + } + else if (p.equals(RDFS.RANGE)) { + // Don't add trivial range owl:Thing + if (!o.equals(OWL.THING)) { + getProperty(s).addRange(getClass(o)); + } + } + + // Sub/super relations + else if (p.equals(RDFS.SUBCLASSOF)) { + // Everything is a subclass of owl#Thing, we don't need to + // store that information + if (!o.equals(OWL.THING)) { + getClass(s).addSuperClass(getClass(o)); + } + } + else if (p.equals(RDFS.SUBPROPERTYOF)) { + getProperty(s).addSuperProperty(getProperty(o)); + } + + // Equivalence relations + else if (p.equals(OWL.EQUIVALENTCLASS)) { + getClass(s).addEquivalentClass(getClass(o)); + } + else if (p.equals(OWL.EQUIVALENTPROPERTY)) { + getProperty(s).addEquivalentProperty(getProperty(o)); + } + + // Inverse properties + else if (p.equals(OWL.INVERSEOF)) { + getProperty(s).addInverse(getProperty(o)); + getProperty(o).addInverse(getProperty(s)); + } + + // Complementary classes + else if (p.equals(OWL.COMPLEMENTOF)) { + getClass(s).addComplement(getClass(o)); + getClass(o).addComplement(getClass(s)); + } + + // Disjoint classes and properties + else if (p.equals(OWL.DISJOINTWITH)) { + getClass(s).addDisjoint(getClass(o)); + getClass(o).addDisjoint(getClass(s)); + } + else if (p.equals(OWL2.PROPERTYDISJOINTWITH)) { + getProperty(s).addDisjoint(getProperty(o)); + getProperty(o).addDisjoint(getProperty(s)); + } + + // Property restriction info + else if (p.equals(OWL.ONPROPERTY)) { + getClass(s).addProperty(getProperty(o)); + } + else if (p.equals(OWL.SOMEVALUESFROM)) { + getClass(s).addSvf(getClass(o)); + } + else if (p.equals(OWL.ALLVALUESFROM)) { + getClass(s).addAvf(getClass(o)); + } + else if (p.equals(OWL2.ONCLASS)) { + getClass(s).addClass(getClass(o)); + } + else if (p.equals(OWL.HASVALUE)) { + getClass(s).addValue(o); + } + else if (p.equals(OWL.MAXCARDINALITY)) { + getClass(s).setMaxCardinality(o); + } + else if (p.equals(OWL2.MAXQUALIFIEDCARDINALITY)) { + getClass(s).setMaxQualifiedCardinality(o); + } + } + } + + /** + * Add a particular characteristic to a property. + */ + private void addPropertyType(URI p, Resource t) { + OwlProperty prop = getProperty(p); + if (t.equals(OWL.TRANSITIVEPROPERTY)) { + prop.setTransitive(); + } + else if (t.equals(OWL.SYMMETRICPROPERTY)) { + prop.setSymmetric(); + } + else if (t.equals(OWL2.ASYMMETRICPROPERTY)) { + prop.setAsymmetric(); + } + else if (t.equals(OWL.FUNCTIONALPROPERTY)) { + prop.setFunctional(); + } + else if (t.equals(OWL.INVERSEFUNCTIONALPROPERTY)) { + prop.setInverseFunctional(); + } + else if (t.equals(OWL2.IRREFLEXIVEPROPERTY)) { + prop.setIrreflexive(); + } + } + + /** + * Perform schema-level reasoning to compute the closure of statements + * already represented in this schema. This includes things like subClassOf + * transitivity and applying domain/range to subclasses. + */ + public void closure() { + // RL rule scm-spo: subPropertyOf transitivity + // (takes in subproperty info; yields subproperty info) + for (OwlProperty subprop : properties.values()) { + subprop.computeSuperProperties(); + } + + // RL rules scm-hv, scm-svf2, scm-avf2: restrictions & subproperties + // (take in subproperty info & prop. restrictions; yield subclass info) + for (OwlClass c1 : classes.values()) { + for (OwlClass c2 : classes.values()) { + c1.compareRestrictions(c2); + } + } + + // The following two steps can affect each other, so repeat the block + // as many times as necessary. + boolean repeat; + do { + // RL rule scm-sco: subClassOf transitivity + // (takes in subclass info; yields subclass info) + // (This traverses the complete hierarchy, so we don't need to loop + // again if changes are only made in this step) + for (OwlClass subclass : classes.values()) { + subclass.computeSuperClasses(); + } + // RL rules scm-svf1, scm-avf1: property restrictions & subclasses + // (take in subclass info & prop. restrictions; yield subclass info) + // (If changes are made here, loop through both steps again) + repeat = false; + for (OwlProperty prop : properties.values()) { + repeat = prop.compareRestrictions() || repeat; + } + } while (repeat); + + // Apply RL rules scm-dom1, scm-rng1, scm-dom2, scm-rng2: + // (take in subclass/subproperty & domain/range; yield domain/range) + for (OwlProperty prop : properties.values()) { + prop.inheritDomainRange(); + } + } + + /** + * Determine whether a fact is contained in the Schema object + * relationships or implied by schema rules. + * @return True if this schema contains the semantics of the triple + */ + public boolean containsTriple(Statement triple) { + // The schema certainly doesn't contain it if it's not a + // schema-relevant triple at all. + if (isSchemaTriple(triple)) { + Resource s = triple.getSubject(); + URI p = triple.getPredicate(); + Value o = triple.getObject(); + // If this is telling us something about a property: + if (properties.containsKey(s)) { + OwlProperty prop = properties.get(s); + // Property types: + if (p.equals(RDF.TYPE)) { + if ((o.equals(OWL.TRANSITIVEPROPERTY) + && prop.isTransitive()) + || (o.equals(OWL2.IRREFLEXIVEPROPERTY) + && prop.isIrreflexive()) + || (o.equals(OWL.SYMMETRICPROPERTY) + && prop.isSymmetric()) + || (o.equals(OWL2.ASYMMETRICPROPERTY) + && prop.isAsymmetric()) + || (o.equals(OWL.FUNCTIONALPROPERTY) + && prop.isFunctional()) + || (o.equals(OWL.INVERSEFUNCTIONALPROPERTY) + && prop.isInverseFunctional())) { + return true; + } + } + // Relationships with other properties: + if ((p.equals(RDFS.SUBPROPERTYOF) + && prop.getSuperProperties().contains(o)) + || (p.equals(OWL2.PROPERTYDISJOINTWITH) + && prop.getDisjointProperties().contains(o)) + || (p.equals(OWL.EQUIVALENTPROPERTY) + && prop.getEquivalentProperties().contains(o)) + || (p.equals(OWL.INVERSEOF) + && prop.getInverseProperties().contains(o))) { + return true; + } + // Relationships with classes: + if ((p.equals(RDFS.DOMAIN) + && prop.getDomain().contains(o)) + || (p.equals(RDFS.RANGE) + && prop.getRange().contains(o))) { + return true; + } + } + // If this is about a class relationship: + if (classes.containsKey(s)) { + OwlClass subject = classes.get(s); + if ((p.equals(OWL.EQUIVALENTCLASS) + && (subject.getEquivalentClasses().contains(o))) + || (p.equals(OWL.DISJOINTWITH) + && (subject.getDisjointClasses().contains(o))) + || (p.equals(OWL.COMPLEMENTOF) + && (subject.getComplementaryClasses().contains(o))) + || (p.equals(RDFS.SUBCLASSOF) + && (subject.getSuperClasses().contains(o)))) { + return true; + } + } + } + return false; + } + + /** + * Collect and return counts of different kinds of schema constructs + */ + public String getSummary() { + int nRestrictions = 0; + for (Resource r : classes.keySet()) { + OwlClass c = classes.get(r); + if (!c.getOnProperty().isEmpty()) { + nRestrictions++; + } + } + int nClasses = classes.size(); + int nProperties = properties.size(); + String[] pTypes = { "Transitive", "Symmetric", "Asymmetric", + "Functional", "Inverse Functional", "Irreflexive" }; + String[] rTypes = { "someValuesFrom", "allValuesFrom", "hasValue", + "maxCardinality==0", "maxCardinality>0", + "maxQualifiedCardinality==0", "maxQualifiedCardinality>0", }; + String[] edgeTypes = { "Superclass", "Disjoint class", "Complement", + "Superproperty", "Disjoint property", "Inverse property", + "Domain", "Range", + "Equivalent class", "Equivalent property"}; + int[] pTotals = { 0, 0, 0, 0, 0, 0 }; + int[] rTotals = { 0, 0, 0, 0, 0, 0, 0 }; + int[] edgeTotals = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; + for (OwlClass c : classes.values()) { + edgeTotals[0] += c.getSuperClasses().size() - 2; + edgeTotals[1] += c.getDisjointClasses().size(); + edgeTotals[2] += c.getComplementaryClasses().size(); + edgeTotals[8] += c.getEquivalentClasses().size() - 1; + if (!c.someValuesFrom().isEmpty()) rTotals[0]++; + if (!c.allValuesFrom().isEmpty()) rTotals[1]++; + if (!c.hasValue().isEmpty()) rTotals[2]++; + if (c.getMaxCardinality() == 0) rTotals[3]++; + if (c.getMaxCardinality() > 0) rTotals[4]++; + if (c.getMaxQualifiedCardinality() == 0) rTotals[5]++; + if (c.getMaxQualifiedCardinality() > 0) rTotals[6]++; + } + for (OwlProperty p : properties.values()) { + if (p.isTransitive()) pTotals[0]++; + if (p.isSymmetric()) pTotals[1]++; + if (p.isAsymmetric()) pTotals[2]++; + if (p.isFunctional()) pTotals[3]++; + if (p.isInverseFunctional()) pTotals[4]++; + if (p.isIrreflexive()) pTotals[5]++; + edgeTotals[3] += p.getSuperProperties().size() - 1; + edgeTotals[4] += p.getDisjointProperties().size(); + edgeTotals[5] += p.getInverseProperties().size(); + edgeTotals[6] += p.getDomain().size(); + edgeTotals[7] += p.getRange().size(); + edgeTotals[9] += p.getEquivalentProperties().size(); + } + StringBuilder sb = new StringBuilder(); + sb.append("Schema summary:"); + sb.append("\n\tClasses: " + nClasses); + sb.append("\n\t\tProperty Restrictions: ").append(nRestrictions); + for (int i = 0; i < rTypes.length; i++) { + sb.append("\n\t\t\t"); + sb.append(rTypes[i]).append(": ").append(rTotals[i]); + } + sb.append("\n\t\tOther: ").append(nClasses-nRestrictions); + sb.append("\n\tProperties: ").append(nProperties); + for (int i = 0; i < pTypes.length; i++) { + sb.append("\n\t\t"); + sb.append(pTypes[i]).append(": ").append(pTotals[i]); + } + sb.append("\n\tConnections:"); + for (int i = 0; i < edgeTypes.length; i++) { + sb.append("\n\t\t"); + sb.append(edgeTypes[i]).append(": ").append(edgeTotals[i]); + } + return sb.toString(); + } + + /** + * Assuming a given resource corresponds to a property restriction, + * describe the restriction. + */ + public String explainRestriction(Resource type) { + StringBuilder sb = new StringBuilder(); + if (classes.containsKey(type)) { + OwlClass pr = classes.get(type); + sb.append("owl:Restriction"); + for (URI p : pr.getOnProperty()) { + sb.append(" (owl:onProperty ").append(p.toString()).append(")"); + } + for (Value v : pr.hasValue()) { + sb.append(" (owl:hasValue ").append(v.toString()).append(")"); + } + for (Resource c : pr.someValuesFrom()) { + sb.append(" (owl:someValuesFrom ").append(c.toString()).append(")"); + } + for (Resource c : pr.allValuesFrom()) { + sb.append(" (owl:allValuesFrom ").append(c.toString()).append(")"); + } + int mc = pr.getMaxCardinality(); + int mqc = pr.getMaxQualifiedCardinality(); + if (mc >= 0) { + sb.append(" (owl:maxCardinality ").append(mc).append(")"); + } + if (mqc >= 0) { + sb.append(" (owl:maxQualifiedCardinality ").append(mqc); + } + for (Resource c : pr.onClass()) { + sb.append(" owl:onClass ").append(c.toString()).append(")"); + } + } + return sb.toString(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java new file mode 100644 index 0000000..1d1ac8f --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/TypeReasoner.java @@ -0,0 +1,234 @@ +package mvm.rya.reasoning; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.LinkedList; +import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; + +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.RDF; + +/** + * Keep track of a single node's types and do reasoning about its types. + */ +public class TypeReasoner extends AbstractReasoner { + // This node's types, whether asserted or derived + Map<Resource, Fact> knownTypes = new HashMap<>(); + + // Inferences waiting for particular types to be discovered + Map<Resource, List<Fact>> possibleInferences = new HashMap<>(); + Map<Resource, List<Derivation>> possibleInconsistencies = new HashMap<>(); + + /** + * Constructor. + * @param node Conduct reasoning about/around this node + * @param schema Global schema (class/property) information + * @param t Iteration # of new triples + * @param tSchema Iteration # of latest schema change + */ + public TypeReasoner(Resource node, Schema schema, int t, int tSchema) { + super(node, schema, t, tSchema); + } + + /** + * Process a type (class) assignment from the input. It may have been + * inferred during a previous iteration. + * @param typeFact An assertion about one of this node's types + */ + void processType(Fact typeFact) { + Resource type = (Resource) typeFact.getObject(); + boolean newType = !knownTypes.containsKey(type); + int t = typeFact.getIteration(); + // Save the type in memory unless older knowledge takes precedence + if (newType || t < knownTypes.get(type).getIteration()) { + knownTypes.put(type, typeFact); + // Perform further inference + typeInference(typeFact); + } + } + + /** + * Produce and process a type derivation from this iteration. + * TODO: how to implement rules that would produce "literal rdf:type ?x" + * @param type The type itself + * @param rule The generating rule + * @param source The source of the derivation + */ + void processType(Resource type, OwlRule rule, Fact source) { + processType(triple(node, RDF.TYPE, type, rule, source)); + } + + /** + * Infer additional information from a type assertion. + */ + void typeInference(Fact typeFact) { + Resource type = (Resource) typeFact.getObject(); + OwlClass c = schema.getClass(type); + // RL rule cls-nothing2: Inconsistent if type owl:Nothing + if (OWL.NOTHING.equals(type) && frontier(typeFact)) { + // Skip if this isn't a new fact + collectInconsistency(inconsistency(OwlRule.CLS_NOTHING2, typeFact)); + } + // RL rule cax-dw: type shouldn't be disjointWith a previous type + Set<Resource> disjoint = c.getDisjointClasses(); + disjoint.retainAll(knownTypes.keySet()); + for (Resource other : disjoint) { + Fact otherTypeFact = knownTypes.get(other); + Derivation inc = inconsistency(OwlRule.CAX_DW, typeFact); + inc.addSource(otherTypeFact); + collectInconsistency(inc); + } + // RL rule cls-com: type shouldn't be complementOf a previous type + Set<Resource> complementary = c.getComplementaryClasses(); + complementary.retainAll(knownTypes.keySet()); + for (Resource other : complementary) { + Fact otherTypeFact = knownTypes.get(other); + Derivation inc = inconsistency(OwlRule.CLS_COM, typeFact); + inc.addSource(otherTypeFact); + collectInconsistency(inc); + } + // RL rule cax-sco: subClassOf semantics (derive superclasses) + if (!typeFact.hasRule(OwlRule.CAX_SCO) + && frontier(typeFact)) { + // Skip if typeFact itself came from this rule, and/or if typeFact + // generally shouldn't be the sole source of new information + for (Resource supertype : c.getSuperClasses()) { + // If the supertype isn't trivial, assert it + if (!supertype.equals(type) + && !(supertype.equals(OWL.THING))) { + processType(supertype, OwlRule.CAX_SCO, typeFact); + } + } + } + // Apply property restriction rules: + for (URI prop : c.getOnProperty()) { + // RL rule cls-hv1: if type is an owl:hasValue restriction + for (Value val : c.hasValue()) { + collect(triple(node, prop, val, OwlRule.CLS_HV1, typeFact)); + } + } + // Derive any facts whose explicit condition is this type assignment + if (possibleInferences.containsKey(type)) { + for (Fact fact : possibleInferences.get(type)) { + Fact join = fact.clone(); + join.addSource(typeFact); + collect(join); + } + } + // Derive any inconsistencies whose explicit condition is this type + if (possibleInconsistencies.containsKey(type)) { + for (Derivation d : possibleInconsistencies.get(type)) { + Derivation inc = d.clone(); + inc.addSource(typeFact); + collectInconsistency(inc); + } + } + } + + /** + * Assert an arbitrary fact if and when this node is determined to have + * a particular type. Facilitates join rules specifically concerning type. + */ + void onType(Resource type, Fact fact) { + if (!possibleInferences.containsKey(type)) { + possibleInferences.put(type, new LinkedList<Fact>()); + } + possibleInferences.get(type).add(fact); + // If we already know the type, assert the fact right away. + if (knownTypes.containsKey(type)) { + Fact join = fact.clone(); + join.addSource(knownTypes.get(type)); + collect(join); + } + } + + /** + * Assert an inconsistency if and when this node is determined to have + * a particular type. Facilitates join rules specifically concerning type. + */ + void inconsistentOnType(Resource type, Derivation fact) { + if (!possibleInconsistencies.containsKey(type)) { + possibleInconsistencies.put(type, new LinkedList<Derivation>()); + } + possibleInconsistencies.get(type).add(fact); + // If we already know the type, assert the fact right away. + if (knownTypes.containsKey(type)) { + Derivation d = fact.clone(); + d.addSource(knownTypes.get(type)); + collectInconsistency(d); + } + } + + /** + * Collect all the type knowledge into the output, if it represents new + * information. + */ + void collectTypes() { + for (Resource type : knownTypes.keySet()) { + collect(knownTypes.get(type)); + } + } + + /** + * Get info about types derived and potential inferences. + */ + @Override + public String getDiagnostics() { + int total = 0; + int incTotal = 0; + for (Resource uri : possibleInferences.keySet()) { + total += possibleInferences.get(uri).size(); + } + for (Resource uri : possibleInconsistencies.keySet()) { + incTotal += possibleInconsistencies.get(uri).size(); + } + StringBuilder sb = new StringBuilder(); + sb.append(knownTypes.size()).append(" total types known\n"); + sb.append("Watching for ").append(possibleInferences.size()); + sb.append(" distinct types to trigger any of ").append(total); + sb.append(" possible inferences"); + sb.append("Watching for ").append(possibleInconsistencies.size()); + sb.append(" distinct types to trigger any of ").append(incTotal); + sb.append(" possible inconsistencies"); + return sb.toString(); + } + + /** + * Get the total number of input facts cached. + */ + @Override + public int getNumStored() { + int total = knownTypes.size(); + for (List<Fact> l : possibleInferences.values()) { + total += l.size(); + } + for (List<Derivation> l : possibleInconsistencies.values()) { + total += l.size(); + } + return total; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java new file mode 100644 index 0000000..dde83c6 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/AbstractReasoningTool.java @@ -0,0 +1,492 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import mvm.rya.accumulo.mr.RyaStatementWritable; +import mvm.rya.accumulo.mr.fileinput.RdfFileInputFormat; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.reasoning.Derivation; +import mvm.rya.reasoning.Fact; +import mvm.rya.reasoning.Schema; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskCounter; +import org.apache.hadoop.mapreduce.lib.input.CombineSequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.Tool; +import org.openrdf.rio.RDFFormat; + +/** + * Contains common functionality for MapReduce jobs involved in reasoning. A + * subclass should implement configureReasoningJob and its own mappers and + * reducers. + */ +abstract public class AbstractReasoningTool extends Configured implements Tool { + // Keep track of statistics about the input + protected static enum COUNTERS { ABOX, TBOX, USEFUL }; + + // MapReduce job, to be configured by subclasses + protected Job job; + + /** + * Configure the job's inputs, outputs, mappers, and reducers. + */ + abstract protected void configureReasoningJob(String[] args) throws Exception; + + /** + * Configure and run a MapReduce job. + */ + @Override + public int run(String[] args) throws Exception { + Configuration conf = getConf(); + job = Job.getInstance(conf); + job.setJobName(getJobName()); + job.setJarByClass(this.getClass()); + configureReasoningJob(args); + boolean success = job.waitForCompletion(!MRReasoningUtils.stats(conf)); + if (success) { + return 0; + } + else { + return 1; + } + } + + /** + * Cumulative CPU time taken by all mappers/reducers. + */ + public long getCumulativeTime() throws IOException { + return getCounter(TaskCounter.CPU_MILLISECONDS); + } + + /** + * Default name for the MapReduce job: + */ + protected String getJobName() { + return "Rya reasoning, pass " + MRReasoningUtils.getCurrentIteration(getConf()) + + ": " + this.getClass().getSimpleName() + "_" + System.currentTimeMillis(); + } + + /** + * Number of inconsistencies detected by this job. + */ + public long getNumInconsistencies() throws IOException { + return getCounter(MultipleOutputs.class.getName(), + MRReasoningUtils.INCONSISTENT_OUT); + } + + /** + * Number of new schema triples derived during this job. + */ + public long getNumSchemaTriples() throws IOException { + return getCounter(MultipleOutputs.class.getName(), + MRReasoningUtils.SCHEMA_OUT); + } + + /** + * Number of new instance triples that might be used for future reasoning + */ + public long getNumUsefulOutput() throws IOException { + return getCounter(MultipleOutputs.class.getName(), + MRReasoningUtils.INTERMEDIATE_OUT); + } + + /** + * Number of new instance triples that will not be used for future reasoning + */ + public long getNumTerminalOutput() throws IOException { + return getCounter(MultipleOutputs.class.getName(), + MRReasoningUtils.TERMINAL_OUT); + } + + /** + * Total number of new instance triples derived during this job. + */ + public long getNumInstanceTriples() throws IOException { + return getNumUsefulOutput() + getNumTerminalOutput(); + } + + /** + * Number of instance triples seen as input during this job. + */ + public long getNumInstanceInput() throws IOException { + return getCounter(COUNTERS.ABOX); + } + + /** + * Number of schema triples seen as input during this job. + */ + public long getNumSchemaInput() throws IOException { + return getCounter(COUNTERS.TBOX); + } + + /** + * Increment the schema or instance triple counter, as appropriate. + */ + protected static void countInput(boolean schema, TaskAttemptContext context) { + if (schema) { + context.getCounter(COUNTERS.TBOX).increment(1); + } + else { + context.getCounter(COUNTERS.ABOX).increment(1); + } + } + + /** + * Add the schema file (TBox) to the distributed cache for the current job. + */ + protected void distributeSchema() { + Path schemaPath = MRReasoningUtils.getSchemaPath(job.getConfiguration()); + job.addCacheFile(schemaPath.toUri()); + } + + /** + * Set up the MapReduce job to use as inputs both an Accumulo table and the + * files containing previously derived information, excluding + * inconsistencies. Looks for a file for every iteration number so far, + * preferring final cleaned up output from that iteration but falling back + * on intermediate data if necessary. + * @param tableMapper Mapper class to use for database input + * @param rdfMapper Mapper class to use for direct RDF input + * @param fileMapper Mapper class to use for derived triples input + * @param filter True to exclude previously derived data that couldn't be + * used to derive anything new at this point. + */ + protected void configureMultipleInput( + Class<? extends Mapper<Key, Value, ?, ?>> tableMapper, + Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper, + Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper, + boolean filter) throws IOException, AccumuloSecurityException { + Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration()); + if (inputPath != null) { + configureRdfInput(inputPath, rdfMapper); + } + else { + configureAccumuloInput(tableMapper); + } + configureFileInput(fileMapper, filter); + } + + /** + * Set up the MapReduce job to use as inputs both an Accumulo table and the + * files containing previously derived information. Looks for a file for + * every iteration number so far, preferring final cleaned up output from + * that iteration but falling back on intermediate data if necessary. + * @param tableMapper Mapper class to use for database input + * @param rdfMapper Mapper class to use for direct RDF input + * @param fileMapper Mapper class to use for derived triples input + * @param incMapper Mapper class to use for derived inconsistencies input + * @param filter True to exclude previously derived data that couldn't be + * used to derive anything new at this point. + */ + protected void configureMultipleInput( + Class<? extends Mapper<Key, Value, ?, ?>> tableMapper, + Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper, + Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper, + Class<? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper, + boolean filter) + throws IOException, AccumuloSecurityException { + Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration()); + if (inputPath != null) { + configureRdfInput(inputPath, rdfMapper); + } + else { + configureAccumuloInput(tableMapper); + } + configureFileInput(fileMapper, incMapper, filter); + } + + /** + * Set up the MapReduce job to use file inputs from previous iterations, + * excluding inconsistencies found. + * @param fileMapper Mapper class to use for generated triples + * @param filter Exclude facts that aren't helpful for inference + */ + protected void configureFileInput( + Class <? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper, + final boolean filter) throws IOException { + configureFileInput(fileMapper, null, filter); + } + + /** + * Set up the MapReduce job to use file inputs from previous iterations. + * @param fileMapper Mapper class for generated triples + * @param incMapper Mapper class for generated inconsistenies + * @param filter Exclude facts that aren't helpful for inference + */ + protected void configureFileInput( + Class <? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper, + Class <? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper, + final boolean filter) throws IOException { + // Set up file input for all iterations up to this one + Configuration conf = job.getConfiguration(); + FileSystem fs = FileSystem.get(conf); + Path inputPath; + int iteration = MRReasoningUtils.getCurrentIteration(conf); + // Set min/max split, if not already provided: + long blocksize = Long.parseLong(conf.get("dfs.blocksize")); + String minSplitProp = "mapreduce.input.fileinputformat.split.minsize"; + String maxSplitProp = "mapreduce.input.fileinputformat.split.maxsize"; + conf.set(minSplitProp, conf.get(minSplitProp, String.valueOf(blocksize))); + conf.set(maxSplitProp, conf.get(maxSplitProp, String.valueOf(blocksize*8))); + for (int i = 1; i <= iteration; i++) { + // Prefer cleaned output... + inputPath = MRReasoningUtils.getOutputPath(conf, + MRReasoningUtils.OUTPUT_BASE + i); + // But if there isn't any, try intermediate data: + if (!fs.isDirectory(inputPath)) { + inputPath = MRReasoningUtils.getOutputPath(conf, + MRReasoningUtils.OUTPUT_BASE + i + + MRReasoningUtils.TEMP_SUFFIX); + } + // And only proceed if we found one or the other. + if (fs.isDirectory(inputPath)) { + // Never include debug output. If filter is true, select only + // intermediate and schema data, otherwise include everything. + PathFilter f = new PathFilter() { + public boolean accept(Path path) { + String s = path.getName(); + if (s.startsWith(MRReasoningUtils.DEBUG_OUT)) { + return false; + } + else { + return !filter + || s.startsWith(MRReasoningUtils.INTERMEDIATE_OUT) + || s.startsWith(MRReasoningUtils.SCHEMA_OUT); + } + } + }; + for (FileStatus status : fs.listStatus(inputPath, f)) { + if (status.getLen() > 0) { + Path p = status.getPath(); + String s = p.getName(); + if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT)) { + if (incMapper != null) { + MultipleInputs.addInputPath(job, p, + CombineSequenceFileInputFormat.class, incMapper); + } + } + else { + MultipleInputs.addInputPath(job, status.getPath(), + CombineSequenceFileInputFormat.class, fileMapper); + } + } + } + } + } + } + + /** + * Set up the MapReduce job to use Accumulo as an input. + * @param tableMapper Mapper class to use + */ + protected void configureAccumuloInput(Class<? extends Mapper<Key,Value,?,?>> tableMapper) + throws AccumuloSecurityException { + MRReasoningUtils.configureAccumuloInput(job); + MultipleInputs.addInputPath(job, new Path("/tmp/input"), + AccumuloInputFormat.class, tableMapper); + } + + /** + * Set up the MapReduce job to use an RDF file as an input. + * @param rdfMapper class to use + */ + protected void configureRdfInput(Path inputPath, + Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper) { + Configuration conf = job.getConfiguration(); + String format = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName()); + conf.set(MRUtils.FORMAT_PROP, format); + MultipleInputs.addInputPath(job, inputPath, + RdfFileInputFormat.class, rdfMapper); + } + + /** + * Set up the MapReduce job to output a schema (TBox). + */ + protected void configureSchemaOutput() { + Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration()); + SequenceFileOutputFormat.setOutputPath(job, outPath); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(SchemaWritable.class); + LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); + MultipleOutputs.addNamedOutput(job, "schemaobj", + SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT, + TextOutputFormat.class, Text.class, Text.class); + MultipleOutputs.setCountersEnabled(job, true); + } + + /** + * Set up the MapReduce job to output newly derived triples. Outputs to + * directory [base]-[iteration]. + */ + protected void configureDerivationOutput() { + configureDerivationOutput(false); + } + + /** + * Set up a MapReduce job to output newly derived triples. + * @param intermediate True if this is intermediate data. Outputs + * to [base]-[iteration]-[temp]. + */ + protected void configureDerivationOutput(boolean intermediate) { + Path outPath; + Configuration conf = job.getConfiguration(); + int iteration = MRReasoningUtils.getCurrentIteration(conf); + if (intermediate) { + outPath = MRReasoningUtils.getOutputPath(conf, + MRReasoningUtils.OUTPUT_BASE + iteration + + MRReasoningUtils.TEMP_SUFFIX); + } + else { + outPath = MRReasoningUtils.getOutputPath(conf, + MRReasoningUtils.OUTPUT_BASE + iteration); + } + SequenceFileOutputFormat.setOutputPath(job, outPath); + LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT, + SequenceFileOutputFormat.class, Fact.class, NullWritable.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT, + SequenceFileOutputFormat.class, Fact.class, NullWritable.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT, + SequenceFileOutputFormat.class, Fact.class, NullWritable.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT, + SequenceFileOutputFormat.class, Derivation.class, NullWritable.class); + MultipleOutputs.setCountersEnabled(job, true); + // Set up an output for diagnostic info, if needed + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT, + TextOutputFormat.class, Text.class, Text.class); + } + + /** + * Set up a MapReduce job to output human-readable text. + */ + protected void configureTextOutput(String destination) { + Path outPath; + outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination); + TextOutputFormat.setOutputPath(job, outPath); + LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT, + TextOutputFormat.class, NullWritable.class, Text.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT, + TextOutputFormat.class, NullWritable.class, Text.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT, + TextOutputFormat.class, NullWritable.class, Text.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT, + TextOutputFormat.class, NullWritable.class, Text.class); + MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT, + TextOutputFormat.class, Text.class, Text.class); + MultipleOutputs.setCountersEnabled(job, true); + } + + /** + * Get the name of the output to send an inconsistency to. + * @return The name of the output file(s) to send inconsistencies to + */ + protected static String getOutputName(Derivation inconsistency) { + return MRReasoningUtils.INCONSISTENT_OUT; + } + + /** + * Get the name of the output to send a fact to. + * @param fact The fact itself + * @param finalOut True if this is for final output, not intermediate + * @return The name of the output file(s) to send this fact to + */ + protected static String getOutputName(Fact fact, boolean finalOut) { + if (Schema.isSchemaTriple(fact.getTriple())) { + return MRReasoningUtils.SCHEMA_OUT; + } + else if (!finalOut && fact.isUseful()) { + return MRReasoningUtils.INTERMEDIATE_OUT; + } + else { + return MRReasoningUtils.TERMINAL_OUT; + } + } + + /** + * Get the name of the output to send a fact to. + */ + protected static String getOutputName(Fact fact) { + return getOutputName(fact, false); + } + + /** + * Retrieve an arbitrary counter's value. + * @param group Counter's group name + * @param counter Name of the counter itself + */ + public long getCounter(String group, String counter) throws IOException { + return job.getCounters().findCounter(group, counter).getValue(); + } + + /** + * Retrieve an arbitrary counter's value. + * @param key The Enum tied to this counter + */ + public long getCounter(Enum<?> key) throws IOException { + return job.getCounters().findCounter(key).getValue(); + } + + /** + * Get the current iteration according to this job's configuration. + */ + public int getIteration() { + return MRReasoningUtils.getCurrentIteration(getConf()); + } + + /** + * Get the job's JobID. + */ + public JobID getJobID() { + return job.getJobID(); + } + + /** + * Get the elapsed wall-clock time, assuming the job is done. + */ + public long getElapsedTime() throws IOException, InterruptedException { + return job.getFinishTime() - job.getStartTime(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java new file mode 100644 index 0000000..02cce66 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ConformanceTest.java @@ -0,0 +1,436 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringReader; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.reasoning.Fact; +import mvm.rya.reasoning.Schema; + +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.openrdf.OpenRDFException; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.repository.Repository; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.rio.RDFFormat; +import org.openrdf.rio.helpers.RDFHandlerBase; +import org.openrdf.rio.ntriples.NTriplesParser; +import org.openrdf.rio.rdfxml.RDFXMLParser; +import org.openrdf.sail.memory.MemoryStore; + +/** + * Test the reasoner against Owl conformance tests in the database. + */ +public class ConformanceTest extends Configured implements Tool { + static String TYPE = RDF.TYPE.stringValue(); + static String TEST = "http://www.w3.org/2007/OWL/testOntology#"; + static String TEST_CONSISTENCY = TEST + "ConsistencyTest"; + static String TEST_INCONSISTENCY = TEST + "InconsistencyTest"; + static String TEST_ENTAILMENT = TEST + "PositiveEntailmentTest"; + static String TEST_NONENTAILMENT = TEST + "NegativeEntailmentTest"; + static String TEST_ID = TEST + "identifier"; + static String TEST_DESC = TEST + "description"; + static String TEST_PROFILE = TEST + "profile"; + static String TEST_PREMISE = TEST + "rdfXmlPremiseOntology"; + static String TEST_CONCLUSION = TEST + "rdfXmlConclusionOntology"; + static String TEST_NONCONCLUSION = TEST + "rdfXmlNonConclusionOntology"; + static String TEST_RL = TEST + "RL"; + static String TEST_SEMANTICS = TEST + "semantics"; + static String TEST_RDFBASED = TEST + "RDF-BASED"; + + private static class OwlTest extends RDFHandlerBase { + Value uri; + String name; + String description; + String premise; + String compareTo; + Set<String> types = new HashSet<>(); + boolean success; + Set<Statement> expected = new HashSet<>(); + Set<Statement> unexpected = new HashSet<>(); + Set<Statement> inferred = new HashSet<>(); + Set<Statement> error = new HashSet<>(); + @Override + public void handleStatement(Statement st) { + if (types.contains(TEST_ENTAILMENT)) { + expected.add(st); + } + else if (types.contains(TEST_NONENTAILMENT)) { + unexpected.add(st); + } + } + String type() { + StringBuilder sb = new StringBuilder(); + if (types.contains(TEST_CONSISTENCY)) { + sb.append("{Consistency}"); + } + if (types.contains(TEST_INCONSISTENCY)) { + sb.append("{Inconsistency}"); + } + if (types.contains(TEST_ENTAILMENT)) { + sb.append("{Entailment}"); + } + if (types.contains(TEST_NONENTAILMENT)) { + sb.append("{Nonentailment}"); + } + return sb.toString(); + } + } + + private static class OutputCollector extends RDFHandlerBase { + Set<Statement> triples = new HashSet<>(); + @Override + public void handleStatement(Statement st) { + triples.add(st); + } + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new ConformanceTest(), args); + } + + @Override + public int run(String[] args) throws Exception { + // Validate command + if (args.length < 1 || args.length > 2) { + System.out.println("Usage:\n"); + System.out.println("\tConformanceTest [configuration options] " + + "<test-file> <temp-dir>\n"); + System.out.println("to load test data from an RDF file " + + "(configuration property " + MRUtils.FORMAT_PROP + + " specifies the format, default RDF/XML); or\n"); + System.out.println("\tConformanceTest [configuration options] <temp-dir>\n"); + System.out.println("to load test data from a Rya instance (specified " + + "using standard configuration properties).\n"); + System.out.println("For each test given, run the reasoner over the " + + "premise ontology using a temporary Mini Accumulo instance " + + "at <temp-dir>, then report conformance results."); + System.exit(1); + } + + Set<Value> conformanceTestURIs = new HashSet<>(); + Collection<OwlTest> conformanceTests = new LinkedList<>(); + List<OwlTest> successes = new LinkedList<>(); + List<OwlTest> failures = new LinkedList<>(); + Configuration conf = getConf(); + Repository repo; + File workingDir; + + // If tests are in a file, stick them in a repository for querying + if (args.length == 2) { + workingDir = new File(args[1]); + RDFFormat inputFormat= RDFFormat.RDFXML; + String formatString = conf.get(MRUtils.FORMAT_PROP); + if (formatString != null) { + inputFormat = RDFFormat.valueOf(formatString); + } + repo = new SailRepository(new MemoryStore()); + repo.initialize(); + RepositoryConnection conn = repo.getConnection(); + conn.add(new FileInputStream(args[0]), "", inputFormat); + conn.close(); + } + // Otherwise, get a Rya repository + else { + workingDir = new File(args[0]); + repo = MRReasoningUtils.getRepository(conf); + repo.initialize(); + } + + // Query for the tests we're interested in + RepositoryConnection conn = repo.getConnection(); + conformanceTestURIs.addAll(getTestURIs(conn, TEST_INCONSISTENCY)); + conformanceTestURIs.addAll(getTestURIs(conn, TEST_CONSISTENCY)); + conformanceTestURIs.addAll(getTestURIs(conn, TEST_ENTAILMENT)); + conformanceTestURIs.addAll(getTestURIs(conn, TEST_NONENTAILMENT)); + conformanceTests = getTests(conn, conformanceTestURIs); + conn.close(); + repo.shutDown(); + + // Set up a MiniAccumulo cluster and set up conf to connect to it + String username = "root"; + String password = "root"; + MiniAccumuloCluster mini = new MiniAccumuloCluster(workingDir, password); + mini.start(); + conf.set(MRUtils.AC_INSTANCE_PROP, mini.getInstanceName()); + conf.set(MRUtils.AC_ZK_PROP, mini.getZooKeepers()); + conf.set(MRUtils.AC_USERNAME_PROP, username); + conf.set(MRUtils.AC_PWD_PROP, password); + conf.setBoolean(MRUtils.AC_MOCK_PROP, false); + conf.set(MRUtils.TABLE_PREFIX_PROPERTY, "temp_"); + // Run the conformance tests + int result; + for (OwlTest test : conformanceTests) { + System.out.println(test.uri); + result = runTest(conf, args, test); + if (result != 0) { + return result; + } + if (test.success) { + successes.add(test); + System.out.println("(SUCCESS)"); + } + else { + failures.add(test); + System.out.println("(FAIL)"); + } + } + mini.stop(); + + System.out.println("\n" + successes.size() + " successful tests:"); + for (OwlTest test : successes) { + System.out.println("\t[SUCCESS] " + test.type() + " " + test.name); + } + System.out.println("\n" + failures.size() + " failed tests:"); + for (OwlTest test : failures) { + System.out.println("\t[FAIL] " + test.type() + " " + test.name); + System.out.println("\t\t(" + test.description + ")"); + for (Statement triple : test.error) { + if (test.types.contains(TEST_ENTAILMENT)) { + System.out.println("\t\tExpected: " + triple); + } + else if (test.types.contains(TEST_NONENTAILMENT)) { + System.out.println("\t\tUnexpected: " + triple); + } + } + } + return 0; + } + + /** + * Verify that we can infer the correct triples or detect an inconsistency. + * @param conf Specifies working directory, etc. + * @param OwlTest Contains premise/conclusion graphs, will store result + * @return Return value of the MapReduce job + */ + int runTest(Configuration conf, String[] args, OwlTest test) + throws Exception { + conf.setInt(MRReasoningUtils.STEP_PROP, 0); + conf.setInt(MRReasoningUtils.SCHEMA_UPDATE_PROP, 0); + conf.setBoolean(MRReasoningUtils.DEBUG_FLAG, true); + conf.setBoolean(MRReasoningUtils.OUTPUT_FLAG, true); + // Connect to MiniAccumulo and load the test + Repository repo = MRReasoningUtils.getRepository(conf); + repo.initialize(); + RepositoryConnection conn = repo.getConnection(); + conn.clear(); + conn.add(new StringReader(test.premise), "", RDFFormat.RDFXML); + conn.close(); + repo.shutDown(); + // Run the reasoner + ReasoningDriver reasoner = new ReasoningDriver(); + int result = ToolRunner.run(conf, reasoner, args); + test.success = (result == 0); + // Inconsistency test: successful if determined inconsistent + if (test.types.contains(TEST_INCONSISTENCY)) { + test.success = test.success && reasoner.hasInconsistencies(); + } + // Consistency test: successful if determined consistent + if (test.types.contains(TEST_CONSISTENCY)) { + test.success = test.success && !reasoner.hasInconsistencies(); + } + // Other types: we'll need to look at the inferred triples/schema + if (test.types.contains(TEST_NONENTAILMENT) + || test.types.contains(TEST_ENTAILMENT)) { + System.out.println("Reading inferred triples..."); + // Read in the inferred triples from HDFS: + Schema schema = MRReasoningUtils.loadSchema(conf); + FileSystem fs = FileSystem.get(conf); + Path path = MRReasoningUtils.getOutputPath(conf, "final"); + OutputCollector inferred = new OutputCollector(); + NTriplesParser parser = new NTriplesParser(); + parser.setRDFHandler(inferred); + if (fs.isDirectory(path)) { + for (FileStatus status : fs.listStatus(path)) { + String s = status.getPath().getName(); + if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT) + || s.startsWith(MRReasoningUtils.DEBUG_OUT)) { + continue; + } + BufferedReader br = new BufferedReader( + new InputStreamReader(fs.open(status.getPath()))); + parser.parse(br, ""); + br.close(); + } + } + MRReasoningUtils.deleteIfExists(conf, "final"); + test.inferred.addAll(inferred.triples); + // Entailment test: successful if expected triples were inferred + if (test.types.contains(TEST_ENTAILMENT)) { + // Check expected inferences against the inferred triples and + // the schema reasoner + for (Statement st : test.expected) { + Fact fact = new Fact(st); + if (!test.inferred.contains(st) + && !triviallyTrue(fact.getTriple(), schema) + && !schema.containsTriple(fact.getTriple())) { + test.error.add(st); + } + } + } + // Non-entailment test: failure if non-expected triples inferred + if (test.types.contains(TEST_NONENTAILMENT)) { + for (Statement st : test.unexpected) { + Fact fact = new Fact(st); + if (test.inferred.contains(st) + || schema.containsTriple(fact.getTriple())) { + test.error.add(st); + } + } + } + test.success = test.success && test.error.isEmpty(); + } + conf.setBoolean(MRReasoningUtils.DEBUG_FLAG, false); + MRReasoningUtils.clean(conf); + return result; + } + + /** + * Query a connection for conformance tests matching a particular + * test type. + */ + Set<Value> getTestURIs(RepositoryConnection conn, String testType) + throws IOException, OpenRDFException { + Set<Value> testURIs = new HashSet<>(); + TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, + "select ?test where { " + + "?test <" + TYPE + "> <" + testType + "> .\n" + + "?test <" + TEST_PROFILE + "> <" + TEST_RL + "> .\n" + + "?test <" + TEST_SEMANTICS + "> <" + TEST_RDFBASED + "> .\n" + + "}"); + TupleQueryResult queryResult = query.evaluate(); + while (queryResult.hasNext()) { + BindingSet bindings = queryResult.next(); + testURIs.add(bindings.getValue("test")); + } + queryResult.close(); + return testURIs; + } + + /** + * Query a connection for conformance test details. + */ + Collection<OwlTest> getTests(RepositoryConnection conn, Set<Value> testURIs) + throws IOException, OpenRDFException { + Map<Value, OwlTest> tests = new HashMap<>(); + TupleQuery query = conn.prepareTupleQuery(QueryLanguage.SPARQL, + "select * where { " + + "?test <" + TYPE + "> ?testType .\n" + + "?test <" + TEST_PREMISE + "> ?graph .\n" + + "?test <" + TEST_ID + "> ?name .\n" + + "?test <" + TEST_DESC + "> ?description .\n" + + "?test <" + TEST_PROFILE + "> <" + TEST_RL + "> .\n" + + "?test <" + TEST_SEMANTICS + "> <" + TEST_RDFBASED + "> .\n" + + "OPTIONAL {?test <" + TEST_CONCLUSION + "> ?conclusion .}\n" + + "OPTIONAL {?test <" + TEST_NONCONCLUSION + "> ?nonentailed .}\n" + + "}"); + TupleQueryResult queryResult = query.evaluate(); + while (queryResult.hasNext()) { + BindingSet bindings = queryResult.next(); + Value uri = bindings.getValue("test"); + if (testURIs.contains(uri)) { + OwlTest test; + if (tests.containsKey(uri)) { + test = tests.get(uri); + } + else { + test = new OwlTest(); + test.uri = uri; + test.name = bindings.getValue("name").stringValue(); + test.description = bindings.getValue("description").stringValue(); + test.premise = bindings.getValue("graph").stringValue(); + if (bindings.hasBinding("conclusion")) { + test.compareTo = bindings.getValue("conclusion").stringValue(); + } + if (bindings.hasBinding("nonentailed")) { + test.compareTo = bindings.getValue("nonentailed").stringValue(); + } + tests.put(uri, test); + } + test.types.add(bindings.getValue("testType").stringValue()); + } + } + for (OwlTest test : tests.values()) { + if (test.compareTo != null) { + RDFXMLParser parser = new RDFXMLParser(); + parser.setRDFHandler(test); + parser.parse(new StringReader(test.compareTo), ""); + } + } + queryResult.close(); + return tests.values(); + } + + /** + * Determine that a statement is trivially true for purposes of entailment + * tests, such as an implicit "[bnode] type Ontology" triple or a + * "[class] type Class" triple as long as the class exists. + */ + boolean triviallyTrue(Statement triple, Schema schema) { + Resource s = triple.getSubject(); + URI p = triple.getPredicate(); + Value o = triple.getObject(); + if (p.equals(RDF.TYPE)) { + if (o.equals(OWL.ONTOLOGY)) { + return true; + } + else if (o.equals(OWL.CLASS)) { + return schema.hasClass(s); + } + else if ((o.equals(OWL.OBJECTPROPERTY) + || o.equals(OWL.DATATYPEPROPERTY)) + && s instanceof URI) { + // Distinction not maintained, irrelevant to RL rules + return schema.hasProperty((URI) s); + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java new file mode 100644 index 0000000..3e54367 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/DuplicateElimination.java @@ -0,0 +1,225 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import mvm.rya.accumulo.mr.RyaStatementWritable; +import mvm.rya.reasoning.Derivation; +import mvm.rya.reasoning.Fact; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +public class DuplicateElimination extends AbstractReasoningTool { + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(new DuplicateElimination(), args)); + } + + @Override + protected void configureReasoningJob(String[] args) throws Exception { + configureMultipleInput(DuplicateTableMapper.class, + DuplicateRdfMapper.class, DuplicateFileMapper.class, + InconsistencyMapper.class, false); + job.setMapOutputKeyClass(Fact.class); + job.setMapOutputValueClass(Derivation.class); + job.setReducerClass(DuplicateEliminationReducer.class); + configureDerivationOutput(); + } + + public static class DuplicateEliminationMapper<K, V> extends Mapper<K, V, + Fact, Derivation> { + private MultipleOutputs<?, ?> debugOut; + private boolean debug; + private Text debugK = new Text(); + private Text debugV = new Text(); + private Fact emptyFact = new Fact(); + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + debug = MRReasoningUtils.debug(conf); + if (debug) { + debugOut = new MultipleOutputs<>(context); + } + } + @Override + public void cleanup(Context context) throws IOException, + InterruptedException { + if (debugOut != null) { + debugOut.close(); + } + } + protected void process(Context context, Fact fact, Derivation d, + String source) throws IOException, InterruptedException { + context.write(fact, d); + } + + protected void process(Context context, Fact fact, + String source) throws IOException, InterruptedException { + if (debug) { + debugK.set("MAP_" + source + ": " + fact.explain(false)); + debugV.set("iteration=" + fact.getIteration() + + ", size=" + fact.span()); + debugOut.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV); + } + Derivation d = fact.unsetDerivation(); + process(context, fact, d, source); + } + + protected void process(Context context, Derivation d, + String source) throws IOException, InterruptedException { + if (debug) { + debugK.set("MAP_" + source + ": inconsistency : " + + d.explain(false)); + debugV.set("iteration=" + d.getIteration() + + ", size=" + d.span()); + debugOut.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV); + } + emptyFact.setDerivation(d); + process(context, emptyFact, d, source); + } + } + + public static class DuplicateTableMapper extends DuplicateEliminationMapper< + Key, Value> { + private Fact inputTriple = new Fact(); + @Override + public void map(Key row, Value data, Context context) + throws IOException, InterruptedException { + inputTriple.setTriple(MRReasoningUtils.getStatement(row, data, + context.getConfiguration())); + process(context, inputTriple, "TABLE"); + } + } + + public static class DuplicateFileMapper extends DuplicateEliminationMapper< + Fact, NullWritable> { + @Override + public void map(Fact key, NullWritable value, Context context) + throws IOException, InterruptedException { + process(context, key, "STEP-" + key.getIteration()); + } + } + + public static class DuplicateRdfMapper extends DuplicateEliminationMapper< + LongWritable, RyaStatementWritable> { + private Fact inputTriple = new Fact(); + @Override + public void map(LongWritable key, RyaStatementWritable value, + Context context) throws IOException, InterruptedException { + inputTriple.setTriple(value.getRyaStatement()); + process(context, inputTriple, "RDF"); + } + } + + public static class InconsistencyMapper extends DuplicateEliminationMapper< + Derivation, NullWritable> { + @Override + public void map(Derivation key, NullWritable value, Context context) + throws IOException, InterruptedException { + process(context, key, "INCONSISTENCY-STEP-" + key.getIteration()); + } + } + + public static class DuplicateEliminationReducer extends Reducer< + Fact, Derivation, Fact, NullWritable> { + protected MultipleOutputs<?, ?> mout; + protected int current; + protected boolean debug; + protected Logger log = Logger.getLogger(DuplicateEliminationReducer.class); + protected long totalInput = 0; + protected long totalFacts = 0; + protected long totalOutput = 0; + @Override + public void setup(Context context) { + Configuration conf = context.getConfiguration(); + mout = new MultipleOutputs<>(context); + current = MRReasoningUtils.getCurrentIteration(conf); + debug = MRReasoningUtils.debug(conf); + } + @Override + public void cleanup(Context context) throws IOException, + InterruptedException { + mout.close(); + log.info("Input records processed: " + totalInput); + log.info("Distinct facts: " + totalFacts); + log.info("Output facts: " + totalOutput); + } + @Override + public void reduce(Fact fact, Iterable<Derivation> derivations, + Context context) throws IOException, InterruptedException { + log.debug(fact.toString() + ":"); + totalFacts++; + // We only need to output this fact if it hasn't been derived + // before this step (and wasn't in the original data, marked + // as iteration 0). If we do want to output it, prefer the simplest + // derivation. + Derivation best = null; + boolean newFact = true; + int count = 0; + for (Derivation derivation : derivations) { + count++; + if (newFact) { + if (derivation.getIteration() >= current) { + // Valid so far; check if this is the best derivation: + if (best == null || best.span() > derivation.span()) { + best = derivation.clone(); + } + } + else if (debug) { + newFact = false; + } + else { + return; + } + } + if (debug) { + mout.write(MRReasoningUtils.DEBUG_OUT, + new Text("DE " + fact.toString() + derivation.explain(false)), + new Text(Integer.toString(count) + "\t" + newFact)); + } + } + totalInput += count; + if (newFact) { + totalOutput++; + if (fact.isEmpty()) { + // If there's no triple, it must be an inconsistency + mout.write(getOutputName(best), best, NullWritable.get()); + } + else { + // Output a triple + fact.setDerivation(best); + mout.write(getOutputName(fact), fact, NullWritable.get()); + } + } + log.debug(totalFacts + " facts, " + totalInput + " input records, " + + totalOutput + " output records"); + } + } +}
