Repository: jena Updated Branches: refs/heads/master 9213a355f -> f026f0e7d
http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java b/jena-hadoop-rdf/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java new file mode 100644 index 0000000..5f870ee --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.stats; + +import io.airlift.command.Arguments; +import io.airlift.command.Command; +import io.airlift.command.Help; +import io.airlift.command.HelpOption; +import io.airlift.command.Option; +import io.airlift.command.OptionType; +import io.airlift.command.ParseArgumentsMissingException; +import io.airlift.command.ParseArgumentsUnexpectedException; +import io.airlift.command.ParseException; +import io.airlift.command.ParseOptionMissingException; +import io.airlift.command.ParseOptionMissingValueException; +import io.airlift.command.SingleCommand; +import io.airlift.command.model.CommandMetadata; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.jena.hadoop.rdf.stats.jobs.JobFactory; + + +/** + * Entry point for the Hadoop job, handles launching all the relevant Hadoop + * jobs + */ +@Command(name = "bin/hadoop jar PATH_TO_JAR com.yarcdata.urika.hadoop.rdf.stats.RdfStats", description = "A command which computes statistics on RDF data using Hadoop") +public class RdfStats implements Tool { + + static final String ANSI_RED = "\u001B[31m"; + static final String ANSI_RESET = "\u001B[0m"; + + private static final String DATA_TYPE_TRIPLES = "triples", DATA_TYPE_QUADS = "quads", DATA_TYPE_MIXED = "mixed"; + + /** + * Help option + */ + @Inject + public HelpOption helpOption; + + /** + * Gets/Sets whether all available statistics will be calculated + */ + @Option(name = { "-a", "--all" }, description = "Requests that all available statistics be calculated", type = OptionType.COMMAND) + public boolean all = false; + + /** + * Gets/Sets whether node usage counts will be calculated + */ + @Option(name = { "-n", "--node-count" }, description = "Requests that node usage counts be calculated", type = OptionType.COMMAND) + public boolean nodeCount = false; + + /** + * Gets/Sets whether characteristic sets will be calculated + */ + @Option(name = { "-c", "--characteristic-sets" }, description = "Requests that characteristic sets be calculated", type = OptionType.COMMAND) + public boolean characteristicSets = false; + + /** + * Gets/Sets whether type counts will be calculated + */ + @Option(name = { "-t", "--type-counts" }, description = "Requests that rdf:type usage counts be calculated", type = OptionType.COMMAND) + public boolean typeCount = false; + + /** + * Gets/Sets whether data type counts will be calculated + */ + @Option(name = { "-d", "--data-types" }, description = "Requests that literal data type usage counts be calculated", type = OptionType.COMMAND) + public boolean dataTypeCount = false; + + /** + * Gets/Sets whether namespace counts will be calculated + */ + @Option(name = { "--namespaces" }, description = "Requests that namespace usage counts be calculated", type = OptionType.COMMAND) + public boolean namespaceCount = false; + + /** + * Gets/Sets the input data type used + */ + @Option(name = { "--input-type" }, allowedValues = { DATA_TYPE_MIXED, DATA_TYPE_QUADS, DATA_TYPE_TRIPLES }, description = "Specifies whether the input data is a mixture of quads and triples, just quads or just triples. Using the most specific data type will yield the most accurrate statistics") + public String inputType = DATA_TYPE_MIXED; + + /** + * Gets/Sets the output path + */ + @Option(name = { "-o", "--output" }, title = "OutputPath", description = "Sets the output path", arity = 1, required = true) + public String outputPath = null; + + /** + * Gets/Sets the input path(s) + */ + @Arguments(description = "Sets the input path(s)", title = "InputPath", required = true) + public List<String> inputPaths = new ArrayList<String>(); + + private Configuration config; + + /** + * Entry point method + * + * @param args + * Arguments + * @throws Exception + */ + public static void main(String[] args) throws Exception { + try { + // Run and exit with result code if no errors bubble up + // Note that the exit code may still be a error code + int res = ToolRunner.run(new Configuration(true), new RdfStats(), args); + System.exit(res); + } catch (Exception e) { + System.err.println(ANSI_RED + e.getMessage()); + e.printStackTrace(System.err); + } finally { + System.err.print(ANSI_RESET); + } + // If any errors bubble up exit with non-zero code + System.exit(1); + } + + private static void showUsage() { + CommandMetadata metadata = SingleCommand.singleCommand(RdfStats.class).getCommandMetadata(); + StringBuilder builder = new StringBuilder(); + Help.help(metadata, builder); + System.err.print(ANSI_RESET); + System.err.println(builder.toString()); + System.exit(1); + } + + @Override + public void setConf(Configuration conf) { + this.config = conf; + } + + @Override + public Configuration getConf() { + return this.config; + } + + @Override + public int run(String[] args) throws Exception { + try { + // Parse custom arguments + RdfStats cmd = SingleCommand.singleCommand(RdfStats.class).parse(args); + + // Copy Hadoop configuration across + cmd.setConf(this.getConf()); + + // Show help if requested and exit with success + if (cmd.helpOption.showHelpIfRequested()) { + return 0; + } + + // Run the command and exit with success + cmd.run(); + return 0; + + } catch (ParseOptionMissingException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (ParseOptionMissingValueException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (ParseArgumentsMissingException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (ParseArgumentsUnexpectedException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + // TODO Re-enable as and when we upgrade Airline + // } catch (ParseOptionIllegalValueException e) { + // System.err.println(ANSI_RED + e.getMessage()); + // System.err.println(); + // showUsage(); + } catch (ParseException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (UnsupportedOperationException e) { + System.err.println(ANSI_RED + e.getMessage()); + } catch (Throwable e) { + System.err.println(ANSI_RED + e.getMessage()); + e.printStackTrace(System.err); + } finally { + System.err.print(ANSI_RESET); + } + return 1; + } + + private void run() throws Throwable { + if (!this.outputPath.endsWith("/")) { + this.outputPath += "/"; + } + + // If all statistics requested turn on all statistics + if (this.all) { + this.nodeCount = true; + this.characteristicSets = true; + this.typeCount = true; + this.dataTypeCount = true; + this.namespaceCount = true; + } + + // How many statistics were requested? + int statsRequested = 0; + if (this.nodeCount) + statsRequested++; + if (this.characteristicSets) + statsRequested++; + if (this.typeCount) + statsRequested++; + if (this.dataTypeCount) + statsRequested++; + if (this.namespaceCount) + statsRequested++; + + // Error if no statistics requested + if (statsRequested == 0) { + System.err + .println("You did not request any statistics to be calculated, please use one/more of the relevant options to select the statistics to be computed"); + return; + } + int statsComputed = 1; + + // Compute statistics + if (this.nodeCount) { + Job job = this.selectNodeCountJob(); + statsComputed = this.computeStatistic(job, statsComputed, statsRequested); + } + if (this.typeCount) { + Job[] jobs = this.selectTypeCountJobs(); + statsComputed = this.computeStatistic(jobs, false, false, statsComputed, statsRequested); + } + if (this.dataTypeCount) { + Job job = this.selectDataTypeCountJob(); + statsComputed = this.computeStatistic(job, statsComputed, statsRequested); + } + if (this.namespaceCount) { + Job job = this.selectNamespaceCountJob(); + statsComputed = this.computeStatistic(job, statsComputed, statsRequested); + } + if (this.characteristicSets) { + Job[] jobs = this.selectCharacteristicSetJobs(); + statsComputed = this.computeStatistic(jobs, false, false, statsComputed, statsRequested); + } + } + + private int computeStatistic(Job job, int statsComputed, int statsRequested) throws Throwable { + System.out.println(String.format("Computing Statistic %d of %d requested", statsComputed, statsRequested)); + this.runJob(job); + System.out.println(String.format("Computed Statistic %d of %d requested", statsComputed, statsRequested)); + System.out.println(); + return ++statsComputed; + } + + private int computeStatistic(Job[] jobs, boolean continueOnFailure, boolean continueOnError, int statsComputed, + int statsRequested) { + System.out.println(String.format("Computing Statistic %d of %d requested", statsComputed, statsRequested)); + this.runJobSequence(jobs, continueOnFailure, continueOnError); + System.out.println(String.format("Computed Statistic %d of %d requested", statsComputed, statsRequested)); + System.out.println(); + return ++statsComputed; + } + + private boolean runJob(Job job) throws Throwable { + System.out.println("Submitting Job " + job.getJobName()); + long start = System.nanoTime(); + try { + job.submit(); + if (job.monitorAndPrintJob()) { + System.out.println("Job " + job.getJobName() + " succeeded"); + return true; + } else { + System.out.println("Job " + job.getJobName() + " failed"); + return false; + } + } catch (Throwable e) { + System.out.println("Unexpected failure in Job " + job.getJobName()); + throw e; + } finally { + long end = System.nanoTime(); + System.out.println("Job " + job.getJobName() + " finished after " + + String.format("%,d milliseconds", TimeUnit.NANOSECONDS.toMillis(end - start))); + System.out.println(); + } + } + + private void runJobSequence(Job[] jobs, boolean continueOnFailure, boolean continueOnError) { + for (int i = 0; i < jobs.length; i++) { + Job job = jobs[i]; + try { + boolean success = this.runJob(job); + if (!success && !continueOnFailure) + throw new IllegalStateException("Unable to complete job sequence because Job " + job.getJobName() + " failed"); + } catch (IllegalStateException e) { + throw e; + } catch (Throwable e) { + if (!continueOnError) + throw new IllegalStateException("Unable to complete job sequence because job " + job.getJobName() + + " errorred", e); + } + } + } + + private Job selectNodeCountJob() throws IOException { + String realOutputPath = outputPath + "node-counts/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadNodeCountJob(this.config, inputs, realOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleNodeCountJob(this.config, inputs, realOutputPath); + } else { + return JobFactory.getNodeCountJob(this.config, inputs, realOutputPath); + } + } + + private Job selectDataTypeCountJob() throws IOException { + String realOutputPath = outputPath + "data-type-counts/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadDataTypeCountJob(this.config, inputs, realOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleDataTypeCountJob(this.config, inputs, realOutputPath); + } else { + return JobFactory.getDataTypeCountJob(this.config, inputs, realOutputPath); + } + } + + private Job selectNamespaceCountJob() throws IOException { + String realOutputPath = outputPath + "namespace-counts/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadNamespaceCountJob(this.config, inputs, realOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleNamespaceCountJob(this.config, inputs, realOutputPath); + } else { + return JobFactory.getNamespaceCountJob(this.config, inputs, realOutputPath); + } + } + + private Job[] selectCharacteristicSetJobs() throws IOException { + String intermediateOutputPath = outputPath + "characteristics/intermediate/"; + String finalOutputPath = outputPath + "characteristics/final/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } else { + return JobFactory.getCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } + } + + private Job[] selectTypeCountJobs() throws IOException { + String intermediateOutputPath = outputPath + "type-declarations/"; + String finalOutputPath = outputPath + "type-counts/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } else { + return JobFactory.getTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java b/jena-hadoop-rdf/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java new file mode 100644 index 0000000..55bb8af --- /dev/null +++ b/jena-hadoop-rdf/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java @@ -0,0 +1,757 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.jena.hadoop.rdf.stats.jobs; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.BZip2Codec; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.StringUtils; +import org.apache.jena.hadoop.rdf.io.input.QuadsInputFormat; +import org.apache.jena.hadoop.rdf.io.input.TriplesInputFormat; +import org.apache.jena.hadoop.rdf.io.input.TriplesOrQuadsInputFormat; +import org.apache.jena.hadoop.rdf.io.input.nquads.NQuadsInputFormat; +import org.apache.jena.hadoop.rdf.io.input.ntriples.NTriplesInputFormat; +import org.apache.jena.hadoop.rdf.io.output.nquads.NQuadsOutputFormat; +import org.apache.jena.hadoop.rdf.io.output.ntriples.NTriplesNodeOutputFormat; +import org.apache.jena.hadoop.rdf.io.output.ntriples.NTriplesOutputFormat; +import org.apache.jena.hadoop.rdf.mapreduce.KeyMapper; +import org.apache.jena.hadoop.rdf.mapreduce.RdfMapReduceConstants; +import org.apache.jena.hadoop.rdf.mapreduce.TextCountReducer; +import org.apache.jena.hadoop.rdf.mapreduce.characteristics.CharacteristicSetReducer; +import org.apache.jena.hadoop.rdf.mapreduce.characteristics.QuadCharacteristicSetGeneratingReducer; +import org.apache.jena.hadoop.rdf.mapreduce.characteristics.TripleCharacteristicSetGeneratingReducer; +import org.apache.jena.hadoop.rdf.mapreduce.count.NodeCountReducer; +import org.apache.jena.hadoop.rdf.mapreduce.count.QuadNodeCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.count.TripleNodeCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.count.datatypes.QuadDataTypeCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.count.datatypes.TripleDataTypeCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.count.namespaces.QuadNamespaceCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.count.namespaces.TripleNamespaceCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.count.positional.QuadObjectCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.count.positional.TripleObjectCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.filter.positional.QuadFilterByPredicateMapper; +import org.apache.jena.hadoop.rdf.mapreduce.filter.positional.TripleFilterByPredicateUriMapper; +import org.apache.jena.hadoop.rdf.mapreduce.group.QuadGroupBySubjectMapper; +import org.apache.jena.hadoop.rdf.mapreduce.group.TripleGroupBySubjectMapper; +import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable; +import org.apache.jena.hadoop.rdf.types.NodeWritable; +import org.apache.jena.hadoop.rdf.types.QuadWritable; +import org.apache.jena.hadoop.rdf.types.TripleWritable; + +import com.hp.hpl.jena.vocabulary.RDF; + +/** + * Factory that can produce {@link Job} instances for computing various RDF + * statistics + * + * + * + */ +public class JobFactory { + + /** + * Private constructor prevents instantiation + */ + private JobFactory() { + } + + /** + * Gets a job for computing node counts on RDF triple inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getTripleNodeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Triples Node Usage Count"); + + // Map/Reduce classes + job.setMapperClass(TripleNodeCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(TriplesInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + /** + * Gets a job for computing node counts on RDF quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getQuadNodeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Quads Node Usage Count"); + + // Map/Reduce classes + job.setMapperClass(QuadNodeCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(QuadsInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + /** + * Gets a job for computing node counts on RDF triple and/or quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getNodeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Node Usage Count"); + + // Map/Reduce classes + job.setMapperClass(QuadNodeCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(TriplesOrQuadsInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + /** + * Gets a sequence of jobs that can be used to compute characteristic sets + * for RDF triples + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param intermediateOutputPath + * Intermediate output path + * @param outputPath + * Final output path + * @return Sequence of jobs + * @throws IOException + */ + public static Job[] getTripleCharacteristicSetJobs(Configuration config, String[] inputPaths, String intermediateOutputPath, + String outputPath) throws IOException { + Job[] jobs = new Job[2]; + + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Triples Characteristic Set (Generation)"); + + // Map/Reduce classes + job.setMapperClass(TripleGroupBySubjectMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(TripleWritable.class); + job.setReducerClass(TripleCharacteristicSetGeneratingReducer.class); + job.setOutputKeyClass(CharacteristicSetWritable.class); + job.setOutputValueClass(NullWritable.class); + + // Input and Output + job.setInputFormatClass(TriplesInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath)); + SequenceFileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); + SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); + + jobs[0] = job; + + job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Triples Characteristic Set (Reduction)"); + + // Map/Reduce classes + job.setMapperClass(KeyMapper.class); + job.setMapOutputKeyClass(CharacteristicSetWritable.class); + job.setMapOutputValueClass(CharacteristicSetWritable.class); + job.setReducerClass(CharacteristicSetReducer.class); + job.setOutputKeyClass(CharacteristicSetWritable.class); + job.setOutputValueClass(CharacteristicSetWritable.class); + + // Input and Output + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.setInputPaths(job, intermediateOutputPath); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + jobs[1] = job; + return jobs; + } + + /** + * Gets a sequence of jobs that can be used to compute characteristic sets + * for RDF quads + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param intermediateOutputPath + * Intermediate output path + * @param outputPath + * Final output path + * @return Sequence of jobs + * @throws IOException + */ + public static Job[] getQuadCharacteristicSetJobs(Configuration config, String[] inputPaths, String intermediateOutputPath, + String outputPath) throws IOException { + Job[] jobs = new Job[2]; + + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Quads Characteristic Set (Generation)"); + + // Map/Reduce classes + job.setMapperClass(QuadGroupBySubjectMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(QuadWritable.class); + job.setReducerClass(QuadCharacteristicSetGeneratingReducer.class); + job.setOutputKeyClass(CharacteristicSetWritable.class); + job.setOutputValueClass(NullWritable.class); + + // Input and Output + job.setInputFormatClass(QuadsInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath)); + SequenceFileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); + SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); + + jobs[0] = job; + + job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Quads Characteristic Set (Reduction)"); + + // Map/Reduce classes + job.setMapperClass(KeyMapper.class); + job.setMapOutputKeyClass(CharacteristicSetWritable.class); + job.setMapOutputValueClass(CharacteristicSetWritable.class); + job.setReducerClass(CharacteristicSetReducer.class); + job.setOutputKeyClass(CharacteristicSetWritable.class); + job.setOutputValueClass(CharacteristicSetWritable.class); + + // Input and Output + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.setInputPaths(job, intermediateOutputPath); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + jobs[1] = job; + return jobs; + } + + /** + * Gets a sequence of jobs that can be used to compute characteristic sets + * for RDF triple and/or quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param intermediateOutputPath + * Intermediate output path + * @param outputPath + * Final output path + * @return Sequence of jobs + * @throws IOException + */ + public static Job[] getCharacteristicSetJobs(Configuration config, String[] inputPaths, String intermediateOutputPath, + String outputPath) throws IOException { + Job[] jobs = new Job[2]; + + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Characteristic Set (Generation)"); + + // Map/Reduce classes + job.setMapperClass(QuadGroupBySubjectMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(QuadWritable.class); + job.setReducerClass(QuadCharacteristicSetGeneratingReducer.class); + job.setOutputKeyClass(CharacteristicSetWritable.class); + job.setOutputValueClass(NullWritable.class); + + // Input and Output + job.setInputFormatClass(TriplesOrQuadsInputFormat.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath)); + SequenceFileOutputFormat.setCompressOutput(job, true); + FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); + SequenceFileOutputFormat.setOutputCompressionType(job, CompressionType.BLOCK); + + jobs[0] = job; + + job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Characteristic Set (Reduction)"); + + // Map/Reduce classes + job.setMapperClass(KeyMapper.class); + job.setMapOutputKeyClass(CharacteristicSetWritable.class); + job.setMapOutputValueClass(CharacteristicSetWritable.class); + job.setReducerClass(CharacteristicSetReducer.class); + job.setOutputKeyClass(CharacteristicSetWritable.class); + job.setOutputValueClass(CharacteristicSetWritable.class); + + // Input and Output + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.setInputPaths(job, intermediateOutputPath); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + jobs[1] = job; + return jobs; + } + + /** + * Gets a job for computing type counts on RDF triple inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param intermediateOutputPath + * Path for intermediate output which will be all the type + * declaration triples present in the inputs + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job[] getTripleTypeCountJobs(Configuration config, String[] inputPaths, String intermediateOutputPath, + String outputPath) throws IOException { + Job[] jobs = new Job[2]; + + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Type Triples Extraction"); + + // Map/Reduce classes + job.getConfiguration().setStrings(RdfMapReduceConstants.FILTER_PREDICATE_URIS, RDF.type.getURI()); + job.setMapperClass(TripleFilterByPredicateUriMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(TripleWritable.class); + + // Input and Output Format + job.setInputFormatClass(TriplesInputFormat.class); + job.setOutputFormatClass(NTriplesOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath)); + + jobs[0] = job; + + // Object Node Usage count job + job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Triples Type Usage Count"); + + // Map/Reduce classes + job.setMapperClass(TripleObjectCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(NTriplesInputFormat.class); + NLineInputFormat.setNumLinesPerSplit(job, 10000); // TODO Would be + // better if this was + // intelligently + // configured + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, intermediateOutputPath); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + jobs[1] = job; + + return jobs; + } + + /** + * Gets a job for computing type counts on RDF quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param intermediateOutputPath + * Path for intermediate output which will be all the type + * declaration quads present in the inputs + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job[] getQuadTypeCountJobs(Configuration config, String[] inputPaths, String intermediateOutputPath, + String outputPath) throws IOException { + Job[] jobs = new Job[2]; + + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Type Quads Extraction"); + + // Map/Reduce classes + job.getConfiguration().setStrings(RdfMapReduceConstants.FILTER_PREDICATE_URIS, RDF.type.getURI()); + job.setMapperClass(QuadFilterByPredicateMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(QuadWritable.class); + + // Input and Output Format + job.setInputFormatClass(QuadsInputFormat.class); + job.setOutputFormatClass(NQuadsOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath)); + + jobs[0] = job; + + // Object Node Usage count job + job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Quads Type Usage Count"); + + // Map/Reduce classes + job.setMapperClass(QuadObjectCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(NQuadsInputFormat.class); + NLineInputFormat.setNumLinesPerSplit(job, 10000); // TODO Would be + // better if this was + // intelligently + // configured + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, intermediateOutputPath); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + jobs[1] = job; + + return jobs; + } + + /** + * Gets a job for computing type counts on RDF triple and/or quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param intermediateOutputPath + * Path for intermediate output which will be all the type + * declaration quads present in the inputs + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job[] getTypeCountJobs(Configuration config, String[] inputPaths, String intermediateOutputPath, + String outputPath) throws IOException { + Job[] jobs = new Job[2]; + + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Type Extraction"); + + // Map/Reduce classes + job.getConfiguration().setStrings(RdfMapReduceConstants.FILTER_PREDICATE_URIS, RDF.type.getURI()); + job.setMapperClass(QuadFilterByPredicateMapper.class); + job.setMapOutputKeyClass(LongWritable.class); + job.setMapOutputValueClass(QuadWritable.class); + + // Input and Output Format + job.setInputFormatClass(TriplesOrQuadsInputFormat.class); + job.setOutputFormatClass(NQuadsOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(intermediateOutputPath)); + + jobs[0] = job; + + // Object Node Usage count job + job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Type Usage Count"); + + // Map/Reduce classes + job.setMapperClass(QuadObjectCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(NQuadsInputFormat.class); + NLineInputFormat.setNumLinesPerSplit(job, 10000); // TODO Would be + // better if this was + // intelligently + // configured + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, intermediateOutputPath); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + jobs[1] = job; + + return jobs; + } + + /** + * Gets a job for computing literal data type counts on RDF triple inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getTripleDataTypeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Triples Literal Data Type Usage Count"); + + // Map/Reduce classes + job.setMapperClass(TripleDataTypeCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(TriplesInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + /** + * Gets a job for computing literal data type counts on RDF quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getQuadDataTypeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Quads Literal Data Type Usage Count"); + + // Map/Reduce classes + job.setMapperClass(QuadDataTypeCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(QuadsInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + /** + * Gets a job for computing literal data type counts on RDF triple and/or + * quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getDataTypeCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Literal Data Type Usage Count"); + + // Map/Reduce classes + job.setMapperClass(QuadDataTypeCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(TriplesOrQuadsInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + /** + * Gets a job for computing literal data type counts on RDF triple inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getTripleNamespaceCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Triples Namespace Usage Count"); + + // Map/Reduce classes + job.setMapperClass(TripleNamespaceCountMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(TextCountReducer.class); + + // Input and Output + job.setInputFormatClass(TriplesInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + /** + * Gets a job for computing literal data type counts on RDF quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getQuadNamespaceCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Quads Namespace Usage Count"); + + // Map/Reduce classes + job.setMapperClass(QuadNamespaceCountMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(TextCountReducer.class); + + // Input and Output + job.setInputFormatClass(QuadsInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + /** + * Gets a job for computing literal data type counts on RDF triple and/or + * quad inputs + * + * @param config + * Configuration + * @param inputPaths + * Input paths + * @param outputPath + * Output path + * @return Job + * @throws IOException + */ + public static Job getNamespaceCountJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Namespace Usage Count"); + + // Map/Reduce classes + job.setMapperClass(QuadNamespaceCountMapper.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(TextCountReducer.class); + + // Input and Output + job.setInputFormatClass(TriplesOrQuadsInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/a6c0fefc/jena-hadoop-rdf/pom.xml ---------------------------------------------------------------------- diff --git a/jena-hadoop-rdf/pom.xml b/jena-hadoop-rdf/pom.xml index dc613de..83f2819 100644 --- a/jena-hadoop-rdf/pom.xml +++ b/jena-hadoop-rdf/pom.xml @@ -11,7 +11,7 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <artifactId>jena-hadoop-rdf</artifactId> + <artifactId>jena-elephas</artifactId> <version>0.9.0-SNAPSHOT</version> <packaging>pom</packaging> @@ -22,14 +22,14 @@ <relativePath>../jena-parent</relativePath> </parent> - <name>Apache Jena - RDF Tools for Hadoop</name> + <name>Apache Jena - Elephas</name> <description>A collection of tools for working with RDF on the Hadoop platform</description> <modules> - <module>hadoop-rdf-io</module> - <module>hadoop-rdf-common</module> - <module>hadoop-rdf-mapreduce</module> - <module>hadoop-rdf-stats</module> + <module>jena-elephas-io</module> + <module>jena-elephas-common</module> + <module>jena-elephas-mapreduce</module> + <module>jena-elephas-stats</module> </modules> <!-- Properties common across all profiles --> @@ -42,22 +42,14 @@ <!-- Profiles to allow building for different Hadoop versions --> <profiles> - <!-- Hadoop 2.x Stable --> + <!-- Hadoop 2.x Latest --> <profile> <id>hadoop_2x</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> - <hadoop.version>2.5.1</hadoop.version> - </properties> - </profile> - - <!-- Hadoop 0.23 --> - <profile> - <id>hadoop_023x</id> - <properties> - <hadoop.version>0.23.11</hadoop.version> + <hadoop.version>2.5.0</hadoop.version> </properties> </profile> </profiles> @@ -75,7 +67,7 @@ <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${hadoop.version}</version> </dependency> - + <!-- Jena Dependencies --> <dependency> <groupId>org.apache.jena</groupId>
