http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java new file mode 100644 index 0000000..714ad1c --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ForwardChain.java @@ -0,0 +1,278 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import mvm.rya.accumulo.mr.RyaStatementWritable; +import mvm.rya.reasoning.Derivation; +import mvm.rya.reasoning.LocalReasoner; +import mvm.rya.reasoning.LocalReasoner.Relevance; +import mvm.rya.reasoning.Fact; +import mvm.rya.reasoning.Schema; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; +import org.openrdf.model.Resource; + +public class ForwardChain extends AbstractReasoningTool { + @Override + protected void configureReasoningJob(String[] args) throws Exception { + distributeSchema(); + Configuration conf = job.getConfiguration(); + // We can ignore irrelevant triples, unless the schema has just changed + // and therefore we can't rely on previous determinations of relevance. + configureMultipleInput(TableMapper.class, RdfMapper.class, + FileMapper.class, !MRReasoningUtils.isSchemaNew(conf)); + job.setMapOutputKeyClass(ResourceWritable.class); + job.setMapOutputValueClass(Fact.class); + job.setReducerClass(ReasoningReducer.class); + job.setSortComparatorClass(ResourceWritable.SecondaryComparator.class); + job.setGroupingComparatorClass(ResourceWritable.PrimaryComparator.class); + configureDerivationOutput(true); + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(new ForwardChain(), args)); + } + + /** + * Decide whether to output facts and with what keys. Subclasses handle + * different sources of input. + */ + public static class ForwardChainMapper<K, V> extends Mapper<K, V, + ResourceWritable, Fact> { + protected Schema schema; + protected ResourceWritable node = new ResourceWritable(); + protected MultipleOutputs<?, ?> debugOut; + protected boolean debug; + private Text debugKey = new Text(); + private Text debugValue = new Text(); + public ForwardChainMapper(Schema s) { + this.schema = s; + } + public ForwardChainMapper() {} + + @Override + protected void setup(Context context) { + debugOut = new MultipleOutputs<>(context); + Configuration conf = context.getConfiguration(); + if (schema == null) { + schema = MRReasoningUtils.loadSchema(context.getConfiguration()); + } + debug = MRReasoningUtils.debug(conf); + } + @Override + public void cleanup(Context context) throws IOException, + InterruptedException { + if (debugOut != null) { + debugOut.close(); + } + } + + protected void process(Context context, Fact inputTriple) + throws IOException, InterruptedException { + Relevance rel = LocalReasoner.relevantFact(inputTriple, schema); + if (rel.subject()) { + node.set(inputTriple.getSubject(), 1); + context.write(node, inputTriple); + if (debug) { + int i = inputTriple.getIteration(); + debugKey.set("MAP_OUT" + node.toString()); + debugValue.set(inputTriple.explain(false) + "[" + i + "]"); + debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey, + debugValue); + } + } + if (rel.object()) { + node.set((Resource) inputTriple.getObject(), -1); + context.write(node, inputTriple); + if (debug) { + int i = inputTriple.getIteration(); + debugKey.set("MAP_OUT" + node.toString()); + debugValue.set(inputTriple.explain(false) + "[" + i + "]"); + debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey, + debugValue); + } + } + } + } + + /** + * Get input data from the database + */ + public static class TableMapper extends ForwardChainMapper<Key, Value> { + private Fact inputTriple = new Fact(); + public TableMapper() { super(); } + public TableMapper(Schema s) { super(s); } + @Override + public void map(Key row, Value data, Context context) + throws IOException, InterruptedException { + inputTriple.setTriple(MRReasoningUtils.getStatement(row, data, + context.getConfiguration())); + process(context, inputTriple); + } + } + + /** + * Get intermediate data from a sequence file + */ + public static class FileMapper extends ForwardChainMapper<Fact, + NullWritable> { + public FileMapper() { super(); } + public FileMapper(Schema s) { super(s); } + @Override + public void map(Fact inputTriple, NullWritable nw, + Context context) throws IOException, InterruptedException { + process(context, inputTriple); + } + } + + /** + * Get input data from an RDF file + */ + public static class RdfMapper extends ForwardChainMapper<LongWritable, + RyaStatementWritable> { + private Fact inputTriple = new Fact(); + public RdfMapper() { super(); } + public RdfMapper(Schema s) { super(s); } + @Override + public void map(LongWritable key, RyaStatementWritable rsw, + Context context) throws IOException, InterruptedException { + inputTriple.setTriple(rsw.getRyaStatement()); + process(context, inputTriple); + } + } + + public static class ReasoningReducer extends Reducer<ResourceWritable, + Fact, Fact, NullWritable> { + private static final int LOG_INTERVAL = 5000; + private Logger log = Logger.getLogger(ReasoningReducer.class); + private MultipleOutputs<?, ?> mout; + private Schema schema; + private boolean debug; + private Text debugK = new Text(); + private Text debugV = new Text(); + private int maxStored = 0; + private String maxNode = ""; + public ReasoningReducer(Schema s) { + this.schema = s; + } + public ReasoningReducer() {} + @Override + public void setup(Context context) { + mout = new MultipleOutputs<>(context); + Configuration conf = context.getConfiguration(); + if (schema == null) { + schema = MRReasoningUtils.loadSchema(conf); + } + debug = MRReasoningUtils.debug(conf); + } + @Override + public void cleanup(Context context) throws IOException, + InterruptedException { + if (mout != null) { + mout.close(); + } + log.info("Most input triples stored at one time by any reasoner: " + + maxStored + " (reasoner for node: " + maxNode + ")"); + } + @Override + public void reduce(ResourceWritable key, Iterable<Fact> facts, + Context context) throws IOException, InterruptedException { + log.debug("Reasoning for node " + key.toString()); + // If the schema was just updated, all facts are potentially + // meaningful again. Otherwise, any new derivation must use at + // least one fact from the previous (or this) iteration. + Configuration conf = context.getConfiguration(); + LocalReasoner reasoner = new LocalReasoner(key.get(), schema, + MRReasoningUtils.getCurrentIteration(conf), + MRReasoningUtils.lastSchemaUpdate(conf)); + long numInput = 0; + long numOutput = 0; + for (Fact fact : facts) { + if (debug) { + debugK.set("INPUT<" + key.get().stringValue() + ">"); + debugV.set(fact.toString()); + mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV); + } + // We actually need separate fact objects, as the reasoner might + // store them (default is to reuse the same object each time) + reasoner.processFact(fact.clone()); + numInput++; + numOutput += handleResults(reasoner, context); + if (numInput % LOG_INTERVAL == 0) { + log.debug(reasoner.getDiagnostics()); + log.debug(numInput + " input triples so far"); + log.debug(numOutput + " output triples/inconsistencies so far"); + } + } + reasoner.getTypes(); + numOutput += handleResults(reasoner, context); + int numStored = reasoner.getNumStored(); + if (numStored > maxStored) { + maxStored = numStored; + maxNode = key.toString(); + } + log.debug("..." + numStored + " input facts stored in memory"); + } + + /** + * Process any new results from a reasoner. + */ + private long handleResults(LocalReasoner reasoner, Context context) + throws IOException, InterruptedException { + long numOutput = 0; + if (reasoner.hasNewFacts()) { + for (Fact fact : reasoner.getFacts()) { + mout.write(getOutputName(fact), fact, NullWritable.get()); + numOutput++; + if (debug) { + debugK.set("OUTPUT<" + reasoner.getNode().stringValue() + ">"); + debugV.set(fact.explain(false)); + mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV); + } + } + } + if (reasoner.hasInconsistencies()) { + for (Derivation inconsistency : reasoner.getInconsistencies()) { + mout.write(getOutputName(inconsistency), inconsistency, + NullWritable.get()); + numOutput++; + if (debug) { + debugK.set("OUTPUT<" + inconsistency.getNode().stringValue() + ">"); + debugV.set(inconsistency.explain(false)); + mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV); + } + } + } + return numOutput; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java new file mode 100644 index 0000000..3bed4ca --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/MRReasoningUtils.java @@ -0,0 +1,336 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; +import java.io.IOException; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRdfConstants; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.RyaSailRepository; +import mvm.rya.reasoning.Schema; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.mapreduce.Job; + +/** + * Convenience methods for MapReduce reasoning tasks and options. + */ +public class MRReasoningUtils { + // Configuration variables + public static final String WORKING_DIR = "reasoning.workingDir"; + public static final String LOCAL_INPUT = "reasoning.inputLocal"; + public static final String DEBUG_FLAG = "reasoning.debug"; + public static final String OUTPUT_FLAG = "reasoning.output"; + public static final String STATS_FLAG = "reasoning.stats"; + + // Variables used to pass information from drivers to jobs + public static final String STEP_PROP = "reasoning.step"; + public static final String SCHEMA_UPDATE_PROP = "reasoning.schemaUpdate"; + + // Used to construct input/output directories + static final String OUTPUT_BASE = "step-"; + static final String SCHEMA_BASE = "schema-"; + static final String TEMP_SUFFIX = "a"; + // Named outputs for different kinds of facts + static final String SCHEMA_OUT = "schema"; + static final String INCONSISTENT_OUT = "inconsistencies"; + static final String TERMINAL_OUT = "instance"; + static final String INTERMEDIATE_OUT = "intermediate"; + static final String DEBUG_OUT = "debug"; + + /** + * Load serialized schema information from a file. + */ + public static Schema loadSchema(Configuration conf) { + SchemaWritable schema = new SchemaWritable(); + try { + FileSystem fs = FileSystem.get(conf); + Path schemaPath = getSchemaPath(conf); + if (fs.isDirectory(schemaPath)) { + for (FileStatus status : fs.listStatus(schemaPath)) { + schemaPath = status.getPath(); + if (status.isFile() && status.getLen() > 0 + && !schemaPath.getName().startsWith(DEBUG_OUT)) { + break; + } + } + } + SequenceFile.Reader in = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(schemaPath)); + NullWritable key = NullWritable.get(); + in.next(key, schema); + in.close(); + } + catch (IOException e) { + e.printStackTrace(); + } + return schema; + } + + /** + * Record that the schema was updated at this iteration. + */ + static void schemaUpdated(Configuration conf) { + conf.setInt(SCHEMA_UPDATE_PROP, getCurrentIteration(conf)); + } + + /** + * Mark the beginning of the next iteration. + */ + static void nextIteration(Configuration conf) { + conf.setInt(STEP_PROP, getCurrentIteration(conf)+1); + } + + /** + * Convert an Accumulo row to a RyaStatement. + */ + static RyaStatement getStatement(Key row, Value data, Configuration conf) { + try { + RyaTripleContext ryaContext = RyaTripleContext.getInstance( + new AccumuloRdfConfiguration(conf)); + RyaStatement ryaStatement = ryaContext.deserializeTriple( + RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO, + new TripleRow(row.getRow().getBytes(), row.getColumnFamily().getBytes(), + row.getColumnQualifier().getBytes(), row.getTimestamp(), + row.getColumnVisibility().getBytes(), data.get())); + return ryaStatement; + } + catch (TripleRowResolverException e) { + e.printStackTrace(); + System.err.println("row: " + row); + return null; + } + catch (IllegalArgumentException e) { + e.printStackTrace(); + System.err.println("row: " + row); + throw e; + } + } + + /** + * Clean up intermediate data, unless debug=true + */ + static void clean(Configuration conf) throws IOException { + if (!debug(conf)) { + int iteration = getCurrentIteration(conf); + for (int i = 0; i <= iteration; i++) { + deleteIfExists(conf, OUTPUT_BASE + i); + deleteIfExists(conf, OUTPUT_BASE + i + TEMP_SUFFIX); + deleteIfExists(conf, SCHEMA_BASE + i); + } + deleteIfExists(conf, "input"); + } + } + + /** + * If a local input path was given, upload it to HDFS and configure file + * input. Useful for automating tests against small inputs. + */ + static boolean uploadIfNecessary(Configuration conf) + throws IOException { + String local = conf.get(LOCAL_INPUT); + if (local == null) { + return false; + } + FileSystem fs = FileSystem.get(conf); + String current = new File("").getAbsolutePath(); + Path sourcePath = new Path(current, local); + Path destPath = getOutputPath(conf, "input"); + fs.copyFromLocalFile(false, true, sourcePath, destPath); + conf.set(MRUtils.INPUT_PATH, destPath.toString()); + return true; + } + + /** + * Delete an HDFS directory if it exists + */ + static void deleteIfExists(Configuration conf, String rel) + throws IOException { + FileSystem fs = FileSystem.get(conf); + Path path = getOutputPath(conf, rel); + if (fs.isDirectory(path) || fs.isFile(path)) { + fs.delete(path, true); + } + } + + /** + * Get a Repository from the configuration variables + */ + static RyaSailRepository getRepository(Configuration conf) + throws AccumuloException, AccumuloSecurityException { + boolean mock = conf.getBoolean(MRUtils.AC_MOCK_PROP, false); + String instance = conf.get(MRUtils.AC_INSTANCE_PROP, "instance"); + String username = conf.get(MRUtils.AC_USERNAME_PROP, "root"); + String password = conf.get(MRUtils.AC_PWD_PROP, "root"); + Instance accumulo; + if (mock) { + accumulo = new MockInstance(instance); + } + else { + String zookeepers = conf.get(MRUtils.AC_ZK_PROP, "zoo"); + accumulo = new ZooKeeperInstance(instance, zookeepers); + } + Connector connector = accumulo.getConnector(username, new PasswordToken(password)); + AccumuloRdfConfiguration aconf = new AccumuloRdfConfiguration(conf); + aconf.setTablePrefix(conf.get(MRUtils.TABLE_PREFIX_PROPERTY, + RdfCloudTripleStoreConstants.TBL_PRFX_DEF)); + AccumuloRyaDAO dao = new AccumuloRyaDAO(); + dao.setConnector(connector); + dao.setConf(aconf); + RdfCloudTripleStore store = new RdfCloudTripleStore(); + store.setRyaDAO(dao); + return new RyaSailRepository(store); + } + + /** + * Set up a MapReduce Job to use Accumulo as input. + */ + static void configureAccumuloInput(Job job) + throws AccumuloSecurityException { + Configuration conf = job.getConfiguration(); + String username = conf.get(MRUtils.AC_USERNAME_PROP, "root"); + String password = conf.get(MRUtils.AC_PWD_PROP, ""); + String instance = conf.get(MRUtils.AC_INSTANCE_PROP, "instance"); + String zookeepers = conf.get(MRUtils.AC_ZK_PROP, "zoo"); + Authorizations auths; + String auth = conf.get(MRUtils.AC_AUTH_PROP); + if (auth != null) { + auths = new Authorizations(auth.split(",")); + } + else { + auths = AccumuloRdfConstants.ALL_AUTHORIZATIONS; + } + AccumuloInputFormat.setZooKeeperInstance(job, + ClientConfiguration.loadDefault() + .withInstance(instance).withZkHosts(zookeepers)); + AccumuloInputFormat.setConnectorInfo(job, username, new PasswordToken(password)); + AccumuloInputFormat.setInputTableName(job, getTableName(conf)); + AccumuloInputFormat.setScanAuthorizations(job, auths); + } + + /** + * Get the table name that will be used for Accumulo input. + */ + static String getTableName(Configuration conf) { + String layout = conf.get(MRUtils.TABLE_LAYOUT_PROP, + RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO.toString()); + String prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, + RdfCloudTripleStoreConstants.TBL_PRFX_DEF); + return RdfCloudTripleStoreUtils.layoutPrefixToTable( + RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(layout), prefix); + } + + /** + * Whether we should output the final inferences. + */ + static boolean shouldOutput(Configuration conf) { + return conf.getBoolean(OUTPUT_FLAG, true); + } + + /** + * Return whether debug flag is on. + */ + static boolean debug(Configuration conf) { + return conf.getBoolean(DEBUG_FLAG, false); + } + + /** + * Return whether detailed statistics should be printed. + */ + static boolean stats(Configuration conf) { + return conf.getBoolean(STATS_FLAG, false); + } + + /** + * Get the Path for RDF file input, or null if not given. + */ + static Path getInputPath(Configuration conf) { + String in = conf.get(MRUtils.INPUT_PATH); + if (in == null) { + return null; + } + return new Path(in); + } + + /** + * Get the full output path for a configuration and relative pathname. + */ + static Path getOutputPath(Configuration conf, String name) { + String root = conf.get(WORKING_DIR, "tmp/reasoning"); + return new Path(root + "/" + name); + } + + /** + * Get the path to the Schema. + */ + static Path getSchemaPath(Configuration conf) { + int iteration = lastSchemaUpdate(conf); + return getOutputPath(conf, SCHEMA_BASE + iteration); + } + + /** + * Get the current iteration, useful for keeping track of when facts were + * generated. + */ + public static int getCurrentIteration(Configuration conf) { + return conf.getInt(STEP_PROP, 0); + } + + /** + * Get the time of the last change to the schema. + */ + static int lastSchemaUpdate(Configuration conf) { + return conf.getInt(SCHEMA_UPDATE_PROP, 0); + } + + /** + * True if the schema was just updated on the last pass. + */ + public static boolean isSchemaNew(Configuration conf) { + return lastSchemaUpdate(conf) == getCurrentIteration(conf) - 1; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java new file mode 100644 index 0000000..527b887 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/OutputTool.java @@ -0,0 +1,110 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import mvm.rya.reasoning.Derivation; +import mvm.rya.reasoning.Fact; +import mvm.rya.reasoning.Schema; + +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.util.ToolRunner; + +/** + * Collect inferred triples and detected inconsistencies as text. + */ +public class OutputTool extends AbstractReasoningTool { + @Override + protected void configureReasoningJob(String[] args) throws Exception { + MRReasoningUtils.deleteIfExists(job.getConfiguration(), "final"); + configureFileInput(FactMapper.class, InconsistencyMapper.class, false); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + job.setNumReduceTasks(1); + job.setReducerClass(OutputReducer.class); + configureTextOutput("final"); + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(new OutputTool(), args)); + } + + public static class FactMapper extends Mapper<Fact, NullWritable, + Text, Text> { + Text k = new Text(); + Text v = new Text(); + private boolean debug = false; + @Override + public void setup(Context context) { + debug = MRReasoningUtils.debug(context.getConfiguration()); + } + @Override + public void map(Fact fact, NullWritable nw, Context context) + throws IOException, InterruptedException { + k.set(getOutputName(fact, true)); + v.set(fact.toString()); + context.write(k, v); + if (debug) { + k.set(MRReasoningUtils.DEBUG_OUT); + v.set(fact.explain(true)); + context.write(k, v); + } + } + } + + public static class InconsistencyMapper extends Mapper<Derivation, + NullWritable, Text, Text> { + Text k = new Text(); + Text v = new Text(); + Schema schema; + @Override + public void setup(Context context) { + schema = MRReasoningUtils.loadSchema(context.getConfiguration()); + } + @Override + public void map(Derivation inconsistency, NullWritable nw, Context context) + throws IOException, InterruptedException { + k.set(getOutputName(inconsistency)); + v.set("Inconsistency:\n" + inconsistency.explain(true, schema) + "\n"); + context.write(k, v); + } + } + + public static class OutputReducer extends Reducer<Text, Text, NullWritable, Text> { + private MultipleOutputs<NullWritable, Text> mout; + @Override + public void setup(Context context) { + mout = new MultipleOutputs<>(context); + } + @Override + public void reduce(Text key, Iterable<Text> values, Context context) + throws IOException, InterruptedException { + String out = key.toString(); + for (Text value : values) { + mout.write(out, NullWritable.get(), value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java new file mode 100644 index 0000000..cb2c2d0 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ReasoningDriver.java @@ -0,0 +1,154 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +/** + * Runs a forward-chaining reasoner until no new facts can be derived. + */ +public class ReasoningDriver extends Configured implements Tool { + public static void main(String[] args) throws Exception { + int result = ToolRunner.run(new ReasoningDriver(), args); + System.exit(result); + } + + private boolean reportStats = false; + long numInconsistencies = 0; + + @Override + public int run(String[] args) throws Exception { + Configuration conf = getConf(); + reportStats = MRReasoningUtils.stats(conf); + int iteration = 0; + long newInconsistencies; + long newInstance; + long newSchema; + long usefulOutput; + int result = 0; + boolean productive = true; + boolean findings = false; + SchemaFilter filter; + ForwardChain fc; + DuplicateElimination de; + RunStatistics runStats = new RunStatistics(MRReasoningUtils.getTableName(conf)); + + // If running against a local file, upload it + MRReasoningUtils.uploadIfNecessary(conf); + + // Extract schema information from the database and save it to a file, + // unless the file already exists + Path schemaPath = MRReasoningUtils.getSchemaPath(conf); + if (!FileSystem.get(conf).isDirectory(schemaPath)) { + filter = new SchemaFilter(); + result = ToolRunner.run(conf, filter, args); + if (result != 0) { + productive = false; + } + // Record basic information about the run + runStats.collect(filter, "SchemaFilter"); + } + + // Perform forward-chaining reasoning: + while (productive) { + MRReasoningUtils.nextIteration(conf); + // Attempt to derive new information + fc = new ForwardChain(); + result = ToolRunner.run(conf, fc, args); + runStats.collect(fc, "ForwardChain"); + if (result != 0) { + break; + } + + // Only keep unique, newly generated facts + newInstance = fc.getNumInstanceTriples(); + newSchema = fc.getNumSchemaTriples(); + newInconsistencies = fc.getNumInconsistencies(); + usefulOutput = fc.getNumUsefulOutput(); + if (newInstance + newInconsistencies > 0) { + de = new DuplicateElimination(); + result = ToolRunner.run(conf, de, args); + runStats.collect(de, "DuplicateElimination"); + if (result != 0) { + break; + } + newInstance = de.getNumInstanceTriples(); + newSchema = de.getNumSchemaTriples(); + newInconsistencies = de.getNumInconsistencies(); + usefulOutput = de.getNumUsefulOutput(); + } + + // If schema triples were just deduced, regenerate the whole schema + if (newSchema > 0) { + MRReasoningUtils.schemaUpdated(conf); + filter = new SchemaFilter(); + result = ToolRunner.run(conf, filter, args); + runStats.collect(filter, "SchemaFilter"); + if (result != 0) { + break; + } + } + + iteration = MRReasoningUtils.getCurrentIteration(conf); + if (!reportStats) { + System.out.println("Iteration " + iteration + ":"); + System.out.println("\t" + newInstance + " new instance triples (" + + usefulOutput + " useful for reasoning)"); + System.out.println("\t" + newSchema + " new schema triples"); + System.out.println("\t" + newInconsistencies + " new inconsistencies"); + } + if (newInconsistencies + newInstance + newSchema > 0) { + findings = true; + } + numInconsistencies += newInconsistencies; + // Repeat if we're still generating information + productive = usefulOutput + newSchema > 0; + } + + // Generate final output, if appropriate + if (result == 0 && findings && MRReasoningUtils.shouldOutput(conf)) { + OutputTool out = new OutputTool(); + result = ToolRunner.run(conf, out, args); + runStats.collect(out, "OutputTool"); + } + + // Clean up intermediate data, if appropriate + MRReasoningUtils.clean(conf); + + // Print stats, if specified + if (reportStats) { + System.out.println(runStats.report()); + } + + return result; + } + + /** + * True if we've detected at least one inconsistency. + */ + boolean hasInconsistencies() { + return numInconsistencies > 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java new file mode 100644 index 0000000..986d8a2 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/ResourceWritable.java @@ -0,0 +1,145 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableComparator; +import org.openrdf.model.Resource; +import org.openrdf.model.impl.ValueFactoryImpl; + +/** + * Allows us to use a URI or bnode for a key. + */ +public class ResourceWritable implements WritableComparable<ResourceWritable> { + private Resource val; + private int key = 0; // Allows for secondary sort + + public Resource get() { + return val; + } + + public void set(Resource val) { + this.val = val; + } + + public void set(Resource val, int sortKey) { + this.val = val; + this.key = sortKey; + } + + public void setSortKey(int key) { + this.key = key; + } + + @Override + public void write(DataOutput out) throws IOException { + if (val == null) { + out.writeUTF(""); + } + else { + out.writeUTF(val.toString()); + } + out.writeInt(key); + } + + @Override + public void readFields(DataInput in) throws IOException { + String s = in.readUTF(); + if (s.length() > 0) { + if (s.startsWith("_")) { + val = ValueFactoryImpl.getInstance().createBNode(s.substring(2)); + } + else { + val = ValueFactoryImpl.getInstance().createURI(s); + } + } + key = in.readInt(); + } + + @Override + public int compareTo(ResourceWritable other) { + return val.stringValue().compareTo(other.val.stringValue()); + } + + @Override + public String toString() { + return "<" + val.stringValue() + ">"; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + else if (o == null || this.getClass() != o.getClass()) { + return false; + } + ResourceWritable other = (ResourceWritable) o; + if (this.val == null) { + return other.val == null; + } + else if (other.val == null) { + return false; + } + else if (this.val.stringValue() == null) { + return other.val.stringValue() == null; + } + else { + return this.val.stringValue().equals(other.val.stringValue()); + } + } + + @Override + public int hashCode() { + return val != null ? val.stringValue().hashCode() : 0; + } + + public static class PrimaryComparator extends WritableComparator { + PrimaryComparator() { + super(ResourceWritable.class, true); + } + @Override + public int compare(WritableComparable wc1, WritableComparable wc2) { + ResourceWritable node1 = (ResourceWritable) wc1; + ResourceWritable node2 = (ResourceWritable) wc2; + return node1.compareTo(node2); + } + } + + public static class SecondaryComparator extends WritableComparator { + SecondaryComparator() { + super(ResourceWritable.class, true); + } + @Override + public int compare(WritableComparable wc1, WritableComparable wc2) { + ResourceWritable node1 = (ResourceWritable) wc1; + ResourceWritable node2 = (ResourceWritable) wc2; + int result = node1.compareTo(node2); + if (result == 0) { + result = node1.key - node2.key; + } + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java new file mode 100644 index 0000000..38fbe60 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/RunStatistics.java @@ -0,0 +1,253 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.FileSystemCounter; +import org.apache.hadoop.mapreduce.JobCounter; +import org.apache.hadoop.mapreduce.TaskCounter; + +/** + * Collect and report a variety of statistics for a run. Prints MapReduce + * metrics for each individual job, and overall totals, using Hadoop counters. + */ +public class RunStatistics { + static Map<Stat, TaskCounter> taskCounters = new HashMap<>(); + static Map<Stat, JobCounter> jobCounters = new HashMap<>(); + + private enum Stat { + TABLE("tableName"), + RUN("run"), + ITERATION("iteration"), + JOB("job"), + + ELAPSED_TIME("elapsed (ms)"), + + TBOX_IN("tbox in"), + ABOX_IN("abox in"), + INCONSISTENCIES_OUT("inconsistencies out"), + TRIPLES_OUT("triples out"), + + MAP_INPUT_RECORDS("map input records", TaskCounter.MAP_INPUT_RECORDS), + MAP_OUTPUT_RECORDS("map output records", TaskCounter.MAP_OUTPUT_RECORDS), + REDUCE_INPUT_GROUPS("reduce input groups", TaskCounter.REDUCE_INPUT_GROUPS), + + MAPS("maps", JobCounter.TOTAL_LAUNCHED_MAPS), + REDUCES("reduces", JobCounter.TOTAL_LAUNCHED_REDUCES), + MAP_TIME("map time (ms)", JobCounter.MILLIS_MAPS), + + REDUCE_TIME("reduce time (ms)", JobCounter.MILLIS_REDUCES), + MAP_TIME_VCORES("map time (ms) * cores", JobCounter.VCORES_MILLIS_MAPS), + REDUCE_TIME_VCORES("reduce time (ms) * cores", JobCounter.VCORES_MILLIS_REDUCES), + + GC_TIME("gc time (ms)", TaskCounter.GC_TIME_MILLIS), + CPU_TIME("total cpu time (ms)", TaskCounter.CPU_MILLISECONDS), + + MAP_TIME_MB("map time (ms) * memory (mb)", JobCounter.MB_MILLIS_MAPS), + REDUCE_TIME_MB("reduce time (ms) * memory (mb)", JobCounter.MB_MILLIS_REDUCES), + PHYSICAL_MEMORY_BYTES("physical memory (bytes)", TaskCounter.PHYSICAL_MEMORY_BYTES), + VIRTUAL_MEMORY_BYTES("virtual memory (bytes)", TaskCounter.VIRTUAL_MEMORY_BYTES), + + DATA_LOCAL_MAPS("data-local maps", JobCounter.DATA_LOCAL_MAPS), + MAP_OUTPUT_BYTES("map output bytes", TaskCounter.MAP_OUTPUT_BYTES), + + FILE_BYTES_READ("file bytes read"), + HDFS_BYTES_READ("hdfs bytes read"), + FILE_BYTES_WRITTEN("file bytes written"), + HDFS_BYTES_WRITTEN("hdfs bytes written"), + + FRACTION_TIME_GC("proportion time in gc"), + FRACTION_MEMORY_USAGE("proportion allocated memory used"), + FRACTION_CPU_USAGE("proportion allocated cpu used"); + + String name; + Stat(String name) { + this.name = name; + } + Stat(String key, JobCounter jc) { + this.name = key; + jobCounters.put(this, jc); + } + Stat(String key, TaskCounter tc) { + this.name = key; + taskCounters.put(this, tc); + } + } + + private class JobResult { + Map<Stat, String> info = new HashMap<>(); + Map<Stat, Long> stats = new HashMap<>(); + + void add(JobResult other) { + for (Stat key : other.stats.keySet()) { + if (this.stats.containsKey(key)) { + stats.put(key, this.stats.get(key) + other.stats.get(key)); + } + else { + stats.put(key, other.stats.get(key)); + } + } + } + + void computeMetrics() { + long t = stats.get(Stat.MAP_TIME) + stats.get(Stat.REDUCE_TIME); + long b = stats.get(Stat.PHYSICAL_MEMORY_BYTES); + long timeMbAllocated = stats.get(Stat.MAP_TIME_MB) + stats.get(Stat.REDUCE_TIME_MB); + long timeVcores = stats.get(Stat.MAP_TIME_VCORES) + stats.get(Stat.REDUCE_TIME_VCORES); + long gcTime = stats.get(Stat.GC_TIME); + long cpuTime = stats.get(Stat.CPU_TIME); + long tasks = stats.get(Stat.MAPS) + stats.get(Stat.REDUCES); + double mb = b / 1024.0 / 1024.0; + double avgMb = mb / tasks; + double timeMbUsed = t * avgMb; + info.put(Stat.FRACTION_TIME_GC, String.valueOf((double) gcTime / t)); + info.put(Stat.FRACTION_MEMORY_USAGE, String.valueOf(timeMbUsed / timeMbAllocated)); + info.put(Stat.FRACTION_CPU_USAGE, String.valueOf((double) cpuTime / timeVcores)); + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < Stat.values().length; i++) { + Stat s = Stat.values()[i]; + if (i > 0) { + sb.append(","); + } + if (info.containsKey(s)) { + sb.append(info.get(s)); + } + else if (stats.containsKey(s)) { + sb.append(stats.get(s)); + } + else { + sb.append("--"); + } + } + return sb.toString(); + } + } + + List<JobResult> jobResults = new LinkedList<>(); + Map<String, JobResult> jobTypeResults = new HashMap<>(); + JobResult totals = new JobResult(); + + String runId; + String tableName; + + /** + * Instantiate a RunStatistics object with respect to an overall run + * (which can consist of many jobs). Runs are identified by their first + * job. + * @param tableName Name of the input table + */ + RunStatistics(String tableName) { + this.tableName = tableName; + totals.info.put(Stat.TABLE, tableName); + totals.info.put(Stat.ITERATION, "total"); + totals.info.put(Stat.JOB, "all"); + } + + /** + * Collect all the statistics we're interested in for a single job. + * @param jobType Name of job type (ForwardChain, etc.) + */ + void collect(AbstractReasoningTool tool, String name) throws IOException, + InterruptedException { + // ID is ID of the first job run + if (runId == null) { + runId = tool.getJobID().toString(); + totals.info.put(Stat.RUN, runId); + } + JobResult jobValues = new JobResult(); + jobValues.info.put(Stat.TABLE, tableName); + jobValues.info.put(Stat.RUN, runId); + jobValues.info.put(Stat.ITERATION, String.valueOf(tool.getIteration())); + jobValues.info.put(Stat.JOB, name); + + jobValues.stats.put(Stat.ELAPSED_TIME, tool.getElapsedTime()); + for (Stat key : taskCounters.keySet()) { + jobValues.stats.put(key, tool.getCounter(taskCounters.get(key))); + } + for (Stat key : jobCounters.keySet()) { + jobValues.stats.put(key, tool.getCounter(jobCounters.get(key))); + } + jobValues.stats.put(Stat.TBOX_IN, tool.getNumSchemaInput()); + jobValues.stats.put(Stat.ABOX_IN, tool.getNumInstanceInput()); + jobValues.stats.put(Stat.INCONSISTENCIES_OUT, + tool.getNumInconsistencies()); + jobValues.stats.put(Stat.TRIPLES_OUT, tool.getNumSchemaTriples() + + tool.getNumInstanceTriples()); + jobValues.stats.put(Stat.FILE_BYTES_READ, tool.getCounter( + FileSystemCounter.class.getName(), "FILE_BYTES_READ")); + jobValues.stats.put(Stat.FILE_BYTES_WRITTEN, tool.getCounter( + FileSystemCounter.class.getName(), "FILE_BYTES_WRITTEN")); + jobValues.stats.put(Stat.HDFS_BYTES_READ, tool.getCounter( + FileSystemCounter.class.getName(), "HDFS_BYTES_READ")); + jobValues.stats.put(Stat.HDFS_BYTES_WRITTEN, tool.getCounter( + FileSystemCounter.class.getName(), "HDFS_BYTES_WRITTEN")); + jobResults.add(jobValues); + // Add to the running total for this job type (initialize if needed) + if (!jobTypeResults.containsKey(name)) { + JobResult typeResult = new JobResult(); + typeResult.info.put(Stat.TABLE, tableName); + typeResult.info.put(Stat.RUN, runId); + typeResult.info.put(Stat.ITERATION, "total"); + typeResult.info.put(Stat.JOB, name); + jobTypeResults.put(name, typeResult); + } + jobTypeResults.get(name).add(jobValues); + totals.add(jobValues); + } + + /** + * Report statistics for all jobs. + */ + String report() { + StringBuilder sb = new StringBuilder(); + // Header + for (int i = 0; i < Stat.values().length; i++) { + if (i > 0) { + sb.append(","); + } + sb.append(Stat.values()[i].name); + } + // One line per job + for (JobResult result : jobResults) { + result.computeMetrics(); + sb.append("\n").append(result); + } + // Include aggregates for jobs and overall + if (jobTypeResults.containsKey("ForwardChain")) { + jobTypeResults.get("ForwardChain").computeMetrics(); + sb.append("\n").append(jobTypeResults.get("ForwardChain")); + } + if (jobTypeResults.containsKey("DuplicateElimination")) { + jobTypeResults.get("DuplicateElimination").computeMetrics(); + sb.append("\n").append(jobTypeResults.get("DuplicateElimination")); + } + totals.computeMetrics(); + sb.append("\n").append(totals); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java new file mode 100644 index 0000000..3455e8e --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaFilter.java @@ -0,0 +1,165 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import mvm.rya.accumulo.mr.RyaStatementWritable; +import mvm.rya.reasoning.Fact; +import mvm.rya.reasoning.Schema; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.util.ToolRunner; +import org.apache.log4j.Logger; + +/** + * Collects the schema information stored in the table and outputs the schema + * (TBox) to a file. + */ +public class SchemaFilter extends AbstractReasoningTool { + @Override + protected void configureReasoningJob(String[] args) throws Exception { + configureMultipleInput(SchemaTableMapper.class, SchemaRdfMapper.class, + SchemaFileMapper.class, true); + job.setMapOutputKeyClass(NullWritable.class); + job.setMapOutputValueClass(Fact.class); + job.setReducerClass(SchemaFilterReducer.class); + job.setNumReduceTasks(1); + configureSchemaOutput(); + } + + public static void main(String[] args) throws Exception { + System.exit(ToolRunner.run(new SchemaFilter(), args)); + } + + public static class SchemaTableMapper extends Mapper<Key, Value, + NullWritable, Fact> { + private Fact fact = new Fact(); + /** + * Output a triple if it is schema information. + */ + @Override + public void map(Key row, Value data, Context context) + throws IOException, InterruptedException { + fact.setTriple(MRReasoningUtils.getStatement(row, data, + context.getConfiguration())); + boolean isSchemaTriple = Schema.isSchemaTriple(fact.getTriple()); + if (isSchemaTriple) { + context.write(NullWritable.get(), fact); + } + countInput(isSchemaTriple, context); + } + } + + public static class SchemaFileMapper extends Mapper<Fact, + NullWritable, NullWritable, Fact> { + /** + * For a given fact, output it if it's a schema triple. + */ + @Override + public void map(Fact fact, NullWritable nw, Context context) + throws IOException, InterruptedException { + if (Schema.isSchemaTriple(fact.getTriple())) { + context.write(NullWritable.get(), fact); + } + } + } + + public static class SchemaRdfMapper extends Mapper<LongWritable, + RyaStatementWritable, NullWritable, Fact> { + private Fact fact = new Fact(); + /** + * For a given fact, output it if it's a schema triple. + */ + @Override + public void map(LongWritable key, RyaStatementWritable rsw, Context context) + throws IOException, InterruptedException { + fact.setTriple(rsw.getRyaStatement()); + boolean isSchemaTriple = Schema.isSchemaTriple(fact.getTriple()); + if (isSchemaTriple) { + context.write(NullWritable.get(), fact); + } + countInput(isSchemaTriple, context); + } + } + + public static class SchemaFilterReducer extends Reducer<NullWritable, + Fact, NullWritable, SchemaWritable> { + private SchemaWritable schema; + private Logger log = Logger.getLogger(SchemaFilterReducer.class); + private static int LOG_INTERVAL = 1000; + private boolean debug = false; + private MultipleOutputs<?, ?> debugOut; + private Text debugKey = new Text(); + private Text debugValue = new Text(); + + @Override + protected void setup(Context context) { + schema = new SchemaWritable(); + debug = MRReasoningUtils.debug(context.getConfiguration()); + debugOut = new MultipleOutputs<>(context); + } + + /** + * Collect all schema information into a Schema object, use it to derive + * as much additional schema information as we can, and serialize it to + * an HDFS file. + */ + @Override + protected void reduce(NullWritable key, Iterable<Fact> triples, + Context context) throws IOException, InterruptedException { + long count = 0; + for (Fact fact : triples) { + schema.processTriple(fact.getTriple()); + count++; + if (count % LOG_INTERVAL == 0) { + log.debug("After " + count + " schema triples..."); + log.debug(schema.getSummary()); + } + if (debug) { + debugKey.set("SCHEMA TRIPLE " + count); + debugValue.set(fact.explain(false)); + debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey, debugValue); + } + } + log.debug("Total: " + count + " schema triples"); + log.debug(schema.getSummary()); + } + + @Override + protected void cleanup(Context context) throws IOException, + InterruptedException { + if (debugOut != null) { + debugOut.close(); + } + // Perform schema-level reasoning + schema.closure(); + // Output the complete schema + context.write(NullWritable.get(), schema); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java new file mode 100644 index 0000000..c423a59 --- /dev/null +++ b/extras/rya.reasoning/src/main/java/mvm/rya/reasoning/mr/SchemaWritable.java @@ -0,0 +1,76 @@ +package mvm.rya.reasoning.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; + +import mvm.rya.reasoning.OwlClass; +import mvm.rya.reasoning.OwlProperty; +import mvm.rya.reasoning.Schema; + +import org.apache.hadoop.io.Writable; + +public class SchemaWritable extends Schema implements Writable { + @Override + public void write(DataOutput out) throws IOException { + ArrayList<OwlProperty> propList = new ArrayList<>(properties.values()); + ArrayList<OwlClass> classList = new ArrayList<>(classes.values()); + ByteArrayOutputStream bytes = new ByteArrayOutputStream(); + ObjectOutputStream stream = new ObjectOutputStream(bytes); + stream.writeObject(propList); + stream.writeObject(classList); + byte[] arr = bytes.toByteArray(); + stream.close(); + out.writeInt(arr.length); + out.write(arr); + } + + @Override + public void readFields(DataInput in) throws IOException { + int size = in.readInt(); + byte[] bytes = new byte[size]; + in.readFully(bytes); + ObjectInputStream stream = new ObjectInputStream( + new ByteArrayInputStream(bytes)); + try { + Iterable<?> propList = (Iterable<?>) stream.readObject(); + Iterable<?> classList = (Iterable<?>) stream.readObject(); + for (Object p : propList) { + OwlProperty prop = (OwlProperty) p; + properties.put(prop.getURI(), prop); + } + for (Object c : classList) { + OwlClass owlClass = (OwlClass) c; + classes.put(owlClass.getURI(), owlClass); + } + } + catch (ClassNotFoundException e) { + e.printStackTrace(); + } + stream.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/01489efb/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java b/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java new file mode 100644 index 0000000..2084e05 --- /dev/null +++ b/extras/rya.reasoning/src/test/java/mvm/rya/reasoning/LocalReasonerTest.java @@ -0,0 +1,512 @@ +package mvm.rya.reasoning; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.model.vocabulary.OWL; +import org.openrdf.model.vocabulary.SKOS; + +public class LocalReasonerTest { + private LocalReasoner reasoner; + private Schema schema; + + /** + * Load in a small schema to use in instance reasoning + */ + @Before + public void loadSchema() { + schema = new Schema(); + reasoner = new LocalReasoner(TestUtils.NODE, schema, 1, 0); + } + + /** + * cax-sco + */ + @Test + public void testInferSuperclass() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("Professor"), + RDFS.SUBCLASSOF, TestUtils.uri("Faculty"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + TestUtils.uri("Professor"))); + Assert.assertTrue("Type not derived from subclass", + reasoner.types.knownTypes.containsKey(TestUtils.uri("Faculty"))); + } + + /** + * prp-dom + */ + @Test + public void testInferDomain() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("hasAlumnus"), + RDFS.DOMAIN, TestUtils.uri("University"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("hasAlumnus"), TestUtils.uri("John Doe"))); + Assert.assertTrue("Type not derived from rdfs:domain", + reasoner.types.knownTypes.containsKey(TestUtils.uri("University"))); + } + + /** + * prp-rng + */ + @Test + public void testInferRange() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("advisor"), + RDFS.RANGE, TestUtils.uri("Professor"))); + reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"), + TestUtils.uri("advisor"), TestUtils.NODE)); + Assert.assertTrue("Type not derived from rdfs:range", + reasoner.types.knownTypes.containsKey(TestUtils.uri("Professor"))); + } + + /** + * cls-nothing2 + */ + @Test + public void testNothingInconsistent() throws Exception { + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, OWL.NOTHING)); + Assert.assertTrue("rdf:type owl:Nothing should be inconsistent", + reasoner.hasInconsistencies()); + } + + /** + * prp-inv1 + */ + @Test + public void testInverseProperty1() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("memberOf"), + OWL.INVERSEOF, TestUtils.uri("member"))); + reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), + TestUtils.uri("memberOf"), TestUtils.NODE)); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.NODE) + && t.getPredicate().equals(TestUtils.uri("member")) + && t.getObject().equals(TestUtils.uri("y"))) { + return; + } + } + Assert.fail("Should have derived inverse triple"); + } + + /** + * prp-inv2 + */ + @Test + public void testInverseProperty2() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("memberOf"), + OWL.INVERSEOF, TestUtils.uri("member"))); + reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), + TestUtils.uri("member"), TestUtils.NODE)); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.NODE) + && t.getPredicate().equals(TestUtils.uri("memberOf")) + && t.getObject().equals(TestUtils.uri("y"))) { + return; + } + } + Assert.fail("Should have derived inverse triple"); + } + + /** + * prp-spo1 + */ + @Test + public void testInferSuperproperty() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("headOf"), + RDFS.SUBPROPERTYOF, TestUtils.uri("worksFor"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("headOf"), TestUtils.uri("org"))); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.NODE) + && t.getPredicate().equals(TestUtils.uri("worksFor")) + && t.getObject().equals(TestUtils.uri("org"))) { + return; + } + } + Assert.fail("Superproperty not inferred from subproperty"); + } + + /** + * prp-symp + */ + @Test + public void testSymmetry() throws Exception { + schema.processTriple(TestUtils.statement(SKOS.RELATED, RDF.TYPE, + OWL.SYMMETRICPROPERTY)); + reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), SKOS.RELATED, + TestUtils.NODE)); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.NODE) + && t.getPredicate().equals(SKOS.RELATED) + && t.getObject().equals(TestUtils.uri("y"))) { + return; + } + } + Assert.fail("Symmetric property not inferred"); + } + + /** + * cls-com + */ + @Test + public void testComplementaryClasses() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("A"), + OWL.COMPLEMENTOF, TestUtils.uri("NotA"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + TestUtils.uri("A"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + TestUtils.uri("NotA"))); + Assert.assertTrue("Complementary class membership not detected", + reasoner.hasInconsistencies()); + } + + /** + * cax-dw + */ + @Test + public void testDisjointClasses() throws Exception { + schema.processTriple(TestUtils.statement(SKOS.CONCEPT, OWL.DISJOINTWITH, + SKOS.COLLECTION)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + SKOS.CONCEPT)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + SKOS.COLLECTION)); + Assert.assertTrue("Disjoint class membership not detected", + reasoner.hasInconsistencies()); + } + + /** + * prp-trp + */ + @Test + public void testTransitivePropertyIncomingOutgoing() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("subOrganizationOf"), + RDF.TYPE, OWL.TRANSITIVEPROPERTY)); + reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), + TestUtils.uri("subOrganizationOf"), TestUtils.NODE)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("subOrganizationOf"), TestUtils.uri("z"))); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.uri("y")) + && t.getPredicate().equals(TestUtils.uri("subOrganizationOf")) + && t.getObject().equals(TestUtils.uri("z"))) { + return; + } + } + Assert.fail("Transitive relation not inferred (received incoming edge first)"); + } + @Test + public void testTransitivePropertyOutgoingIncoming() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("subOrganizationOf"), + RDF.TYPE, OWL.TRANSITIVEPROPERTY)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("subOrganizationOf"), TestUtils.uri("z"))); + reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), + TestUtils.uri("subOrganizationOf"), TestUtils.NODE)); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.uri("y")) + && t.getPredicate().equals(TestUtils.uri("subOrganizationOf")) + && t.getObject().equals(TestUtils.uri("z"))) { + Assert.fail("Transitive relation should not be inferred " + + "(received outgoing edge first)"); + } + } + } + + /** + * prp-irp + */ + @Test + public void testIrreflexiveProperty() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("hasParent"), + RDF.TYPE, OWL2.IRREFLEXIVEPROPERTY)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("hasParent"), TestUtils.NODE)); + Assert.assertTrue("Relation to self via irreflexive property not detected", + reasoner.hasInconsistencies()); + } + + /** + * prp-asyp + */ + @Test + public void testAsymmetricPropertyIncomingOutgoing() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"), + RDF.TYPE, OWL2.ASYMMETRICPROPERTY)); + reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), + TestUtils.uri("hasChild"), TestUtils.NODE)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("hasChild"), TestUtils.uri("y"))); + Assert.assertTrue("Symmetric relation with asymmetric property not detected" + + " (receiving incoming edge first)", reasoner.hasInconsistencies()); + } + @Test + public void testAsymmetricPropertyReverseOutgoingIncoming() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"), RDF.TYPE, + OWL2.ASYMMETRICPROPERTY)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("hasChild"), TestUtils.uri("y"))); + reasoner.processFact(TestUtils.fact(TestUtils.uri("y"), + TestUtils.uri("hasChild"), TestUtils.NODE)); + Assert.assertFalse("Symmetric relation with asymmetric property should" + + " not be detected when receiving outgoing edge first", + reasoner.hasInconsistencies()); + } + @Test + public void testAsymmetricPropertyReflexive() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("hasChild"), + RDF.TYPE, OWL2.ASYMMETRICPROPERTY)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("hasChild"), TestUtils.NODE)); + Assert.assertTrue("Self-referential relation with asymmetric property" + + " not detected", reasoner.hasInconsistencies()); + } + + /** + * prp-pdw + */ + @Test + public void testDisjointProperties() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("p1"), + OWL2.PROPERTYDISJOINTWITH, TestUtils.uri("p2"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("p1"), TestUtils.uri("y"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("p2"), TestUtils.uri("y"))); + Assert.assertTrue("Disjoint property usage not detected (left-hand side" + + " of propertyDisjointWith received first)", reasoner.hasInconsistencies()); + } + @Test + public void testDisjointPropertiesReverse() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("p2"), + OWL2.PROPERTYDISJOINTWITH, TestUtils.uri("p1"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("p1"), TestUtils.uri("y"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("p2"), TestUtils.uri("y"))); + Assert.assertTrue("Disjoint property usage not detected (right-hand side" + + " of propertyDisjointWith received first)", reasoner.hasInconsistencies()); + } + + /** + * cls-svf1 + */ + @Test + public void testSomeValuesFromMembership() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.SOMEVALUESFROM, TestUtils.uri("Department"))); + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ONPROPERTY, TestUtils.uri("headOf"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + TestUtils.uri("Department"))); + reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"), + TestUtils.uri("headOf"), TestUtils.NODE)); + reasoner.getTypes(); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.uri("John Doe")) + && t.getPredicate().equals(RDF.TYPE) + && t.getObject().equals(TestUtils.uri("x"))) { + return; + } + } + Assert.fail("If x is a property restriction [owl:someValuesFrom c]" + + " on property p; (z type c) then (y p z) should imply" + + " (y type x)."); + } + @Test + public void testSomeValuesFromMembershipReverse() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.SOMEVALUESFROM, TestUtils.uri("Department"))); + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ONPROPERTY, TestUtils.uri("headOf"))); + reasoner.processFact(TestUtils.fact(TestUtils.uri("John Doe"), + TestUtils.uri("headOf"), TestUtils.NODE)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + TestUtils.uri("Department"))); + reasoner.getTypes(); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.uri("John Doe")) + && t.getPredicate().equals(RDF.TYPE) + && t.getObject().equals(TestUtils.uri("x"))) { + return; + } + } + Assert.fail("If x is a property restriction [owl:someValuesFrom c]" + + " on property p; (y p z) then (z type c) should imply" + + " (y type x)."); + } + + /** + * cls-svf2 + */ + @Test + public void testSomeValuesFromThing() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.SOMEVALUESFROM, OWL.THING)); + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ONPROPERTY, TestUtils.uri("foo"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("foo"), TestUtils.uri("bar"))); + reasoner.getTypes(); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.NODE) + && t.getPredicate().equals(RDF.TYPE) + && t.getObject().equals(TestUtils.uri("x"))) { + return; + } + } + Assert.fail("If x is a property restriction [owl:someValuesFrom owl:Thing]" + + " on property p; (y p z) should imply (y type x) for any z."); + } + + /** + * cls-hv1 + */ + @Test + public void testHasValueInferValue() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), OWL.HASVALUE, + TestUtils.uri("y"))); + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ONPROPERTY, TestUtils.uri("p"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + TestUtils.uri("x"))); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.NODE) + && t.getPredicate().equals(TestUtils.uri("p")) + && t.getObject().equals(TestUtils.uri("y"))) { + return; + } + } + Assert.fail("If x is a property restriction [owl:hasValue y]" + + " on property p; (u type x) should imply (u p y) for any u."); + } + + /** + * cls-hv2 + */ + @Test + public void testHasValueInferClass() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), OWL.HASVALUE, + TestUtils.uri("y"))); + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ONPROPERTY, TestUtils.uri("p"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("p"), TestUtils.uri("y"))); + reasoner.getTypes(); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.NODE) + && t.getPredicate().equals(RDF.TYPE) + && t.getObject().equals(TestUtils.uri("x"))) { + return; + } + } + Assert.fail("If x is a property restriction [owl:hasValue y]" + + " on property p; (u p y) should imply (u type x) for any u."); + } + + + /** + * cls-avf + */ + @Test + public void testAllValuesFrom() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ALLVALUESFROM, TestUtils.uri("Human"))); + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ONPROPERTY, TestUtils.uri("hasParent"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("hasParent"), TestUtils.uri("v"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + TestUtils.uri("x"))); + reasoner.getTypes(); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.uri("v")) + && t.getPredicate().equals(RDF.TYPE) + && t.getObject().equals(TestUtils.uri("Human"))) { + return; + } + } + Assert.fail("If x is a property restriction [owl:allValuesFrom c]" + + " on property p; (u p v) then (u type x) should imply (v type c)."); + } + @Test + public void testAllValuesFromReverse() throws Exception { + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ALLVALUESFROM, TestUtils.uri("Human"))); + schema.processTriple(TestUtils.statement(TestUtils.uri("x"), + OWL.ONPROPERTY, TestUtils.uri("hasParent"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, + TestUtils.uri("x"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, + TestUtils.uri("hasParent"), TestUtils.uri("v"))); + reasoner.getTypes(); + for (Fact t : reasoner.getFacts()) { + if (t.getSubject().equals(TestUtils.uri("v")) + && t.getPredicate().equals(RDF.TYPE) + && t.getObject().equals(TestUtils.uri("Human"))) { + return; + } + } + Assert.fail("If x is a property restriction [owl:allValuesFrom c]" + + " on property p; (u type x) then (u p v) should imply (v type c)."); + } + + /** + * cls-maxc1 + */ + @Test + public void testMaxCardinalityZero() throws Exception { + URI r = TestUtils.uri("restriction"); + URI p = TestUtils.uri("impossiblePredicate1"); + schema.processTriple(TestUtils.statement(r, OWL.MAXCARDINALITY, + TestUtils.intLiteral("0"))); + schema.processTriple(TestUtils.statement(r, OWL.ONPROPERTY, p)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, p, + TestUtils.uri("y"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, r)); + Assert.assertTrue("If p has maxCardinality of 0; then any (x p y)" + + " should be found inconsistent", reasoner.hasInconsistencies()); + } + + /** + * cls-maxqc2 + */ + @Test + public void testMaxQCardinalityZeroThings() throws Exception { + Resource r = TestUtils.bnode("restriction"); + URI p = TestUtils.uri("impossiblePredicate2"); + schema.processTriple(TestUtils.statement(r, OWL2.MAXQUALIFIEDCARDINALITY, + TestUtils.intLiteral("0"))); + schema.processTriple(TestUtils.statement(r, OWL.ONPROPERTY, p)); + schema.processTriple(TestUtils.statement(r, OWL2.ONCLASS, OWL.THING)); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, p, + TestUtils.uri("y"))); + reasoner.processFact(TestUtils.fact(TestUtils.NODE, RDF.TYPE, r)); + Assert.assertTrue("If p has maxQualifiedCardinality of 0 on owl:Thing;" + + " then any (x p y) should be found inconsistent", + reasoner.hasInconsistencies()); + } +}