Add new graph sizes statistic to Elephas demo Upgrades the airline library to a more recent and feature rich variant of the library
Project: http://git-wip-us.apache.org/repos/asf/jena/repo Commit: http://git-wip-us.apache.org/repos/asf/jena/commit/5a49ce8d Tree: http://git-wip-us.apache.org/repos/asf/jena/tree/5a49ce8d Diff: http://git-wip-us.apache.org/repos/asf/jena/diff/5a49ce8d Branch: refs/heads/hadoop-rdf Commit: 5a49ce8d4a8604a645ce1dba576f612cc17233ca Parents: b24e9e7 Author: Rob Vesse <[email protected]> Authored: Wed Jan 7 15:38:50 2015 +0000 Committer: Rob Vesse <[email protected]> Committed: Wed Jan 7 15:38:50 2015 +0000 ---------------------------------------------------------------------- jena-elephas/jena-elephas-stats/pom.xml | 3 +- .../apache/jena/hadoop/rdf/stats/RdfStats.java | 794 ++++++++++--------- .../jena/hadoop/rdf/stats/jobs/JobFactory.java | 64 ++ jena-elephas/pom.xml | 8 + 4 files changed, 481 insertions(+), 388 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jena/blob/5a49ce8d/jena-elephas/jena-elephas-stats/pom.xml ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-stats/pom.xml b/jena-elephas/jena-elephas-stats/pom.xml index 526d060..4fd40d8 100644 --- a/jena-elephas/jena-elephas-stats/pom.xml +++ b/jena-elephas/jena-elephas-stats/pom.xml @@ -42,9 +42,8 @@ <!-- CLI related Dependencies --> <dependency> - <groupId>io.airlift</groupId> + <groupId>com.github.rvesse</groupId> <artifactId>airline</artifactId> - <version>0.6</version> </dependency> <!-- Hadoop Dependencies --> http://git-wip-us.apache.org/repos/asf/jena/blob/5a49ce8d/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java b/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java index 5f870ee..b9bd9e7 100644 --- a/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java +++ b/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/RdfStats.java @@ -16,390 +16,412 @@ * limitations under the License. */ -package org.apache.jena.hadoop.rdf.stats; - -import io.airlift.command.Arguments; -import io.airlift.command.Command; -import io.airlift.command.Help; -import io.airlift.command.HelpOption; -import io.airlift.command.Option; -import io.airlift.command.OptionType; -import io.airlift.command.ParseArgumentsMissingException; -import io.airlift.command.ParseArgumentsUnexpectedException; -import io.airlift.command.ParseException; -import io.airlift.command.ParseOptionMissingException; -import io.airlift.command.ParseOptionMissingValueException; -import io.airlift.command.SingleCommand; -import io.airlift.command.model.CommandMetadata; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import javax.inject.Inject; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; +package org.apache.jena.hadoop.rdf.stats; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.HelpOption; +import io.airlift.airline.Option; +import io.airlift.airline.ParseArgumentsMissingException; +import io.airlift.airline.ParseArgumentsUnexpectedException; +import io.airlift.airline.ParseException; +import io.airlift.airline.ParseOptionIllegalValueException; +import io.airlift.airline.ParseOptionMissingException; +import io.airlift.airline.ParseOptionMissingValueException; +import io.airlift.airline.SingleCommand; +import io.airlift.airline.help.Help; +import io.airlift.airline.model.CommandMetadata; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.inject.Inject; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.apache.jena.hadoop.rdf.stats.jobs.JobFactory; - - -/** - * Entry point for the Hadoop job, handles launching all the relevant Hadoop - * jobs - */ -@Command(name = "bin/hadoop jar PATH_TO_JAR com.yarcdata.urika.hadoop.rdf.stats.RdfStats", description = "A command which computes statistics on RDF data using Hadoop") -public class RdfStats implements Tool { - - static final String ANSI_RED = "\u001B[31m"; - static final String ANSI_RESET = "\u001B[0m"; - - private static final String DATA_TYPE_TRIPLES = "triples", DATA_TYPE_QUADS = "quads", DATA_TYPE_MIXED = "mixed"; - - /** - * Help option - */ - @Inject - public HelpOption helpOption; - - /** - * Gets/Sets whether all available statistics will be calculated - */ - @Option(name = { "-a", "--all" }, description = "Requests that all available statistics be calculated", type = OptionType.COMMAND) - public boolean all = false; - - /** - * Gets/Sets whether node usage counts will be calculated - */ - @Option(name = { "-n", "--node-count" }, description = "Requests that node usage counts be calculated", type = OptionType.COMMAND) - public boolean nodeCount = false; - - /** - * Gets/Sets whether characteristic sets will be calculated - */ - @Option(name = { "-c", "--characteristic-sets" }, description = "Requests that characteristic sets be calculated", type = OptionType.COMMAND) - public boolean characteristicSets = false; - - /** - * Gets/Sets whether type counts will be calculated - */ - @Option(name = { "-t", "--type-counts" }, description = "Requests that rdf:type usage counts be calculated", type = OptionType.COMMAND) - public boolean typeCount = false; - - /** - * Gets/Sets whether data type counts will be calculated - */ - @Option(name = { "-d", "--data-types" }, description = "Requests that literal data type usage counts be calculated", type = OptionType.COMMAND) - public boolean dataTypeCount = false; - - /** - * Gets/Sets whether namespace counts will be calculated - */ - @Option(name = { "--namespaces" }, description = "Requests that namespace usage counts be calculated", type = OptionType.COMMAND) - public boolean namespaceCount = false; - - /** - * Gets/Sets the input data type used - */ - @Option(name = { "--input-type" }, allowedValues = { DATA_TYPE_MIXED, DATA_TYPE_QUADS, DATA_TYPE_TRIPLES }, description = "Specifies whether the input data is a mixture of quads and triples, just quads or just triples. Using the most specific data type will yield the most accurrate statistics") - public String inputType = DATA_TYPE_MIXED; - - /** - * Gets/Sets the output path - */ - @Option(name = { "-o", "--output" }, title = "OutputPath", description = "Sets the output path", arity = 1, required = true) - public String outputPath = null; - - /** - * Gets/Sets the input path(s) - */ - @Arguments(description = "Sets the input path(s)", title = "InputPath", required = true) - public List<String> inputPaths = new ArrayList<String>(); - - private Configuration config; - - /** - * Entry point method - * - * @param args - * Arguments - * @throws Exception - */ - public static void main(String[] args) throws Exception { - try { - // Run and exit with result code if no errors bubble up - // Note that the exit code may still be a error code - int res = ToolRunner.run(new Configuration(true), new RdfStats(), args); - System.exit(res); - } catch (Exception e) { - System.err.println(ANSI_RED + e.getMessage()); - e.printStackTrace(System.err); - } finally { - System.err.print(ANSI_RESET); - } - // If any errors bubble up exit with non-zero code - System.exit(1); - } - - private static void showUsage() { - CommandMetadata metadata = SingleCommand.singleCommand(RdfStats.class).getCommandMetadata(); - StringBuilder builder = new StringBuilder(); - Help.help(metadata, builder); - System.err.print(ANSI_RESET); - System.err.println(builder.toString()); - System.exit(1); - } - - @Override - public void setConf(Configuration conf) { - this.config = conf; - } - - @Override - public Configuration getConf() { - return this.config; - } - - @Override - public int run(String[] args) throws Exception { - try { - // Parse custom arguments - RdfStats cmd = SingleCommand.singleCommand(RdfStats.class).parse(args); - - // Copy Hadoop configuration across - cmd.setConf(this.getConf()); - - // Show help if requested and exit with success - if (cmd.helpOption.showHelpIfRequested()) { - return 0; - } - - // Run the command and exit with success - cmd.run(); - return 0; - - } catch (ParseOptionMissingException e) { - System.err.println(ANSI_RED + e.getMessage()); - System.err.println(); - showUsage(); - } catch (ParseOptionMissingValueException e) { - System.err.println(ANSI_RED + e.getMessage()); - System.err.println(); - showUsage(); - } catch (ParseArgumentsMissingException e) { - System.err.println(ANSI_RED + e.getMessage()); - System.err.println(); - showUsage(); - } catch (ParseArgumentsUnexpectedException e) { - System.err.println(ANSI_RED + e.getMessage()); - System.err.println(); - showUsage(); - // TODO Re-enable as and when we upgrade Airline - // } catch (ParseOptionIllegalValueException e) { - // System.err.println(ANSI_RED + e.getMessage()); - // System.err.println(); - // showUsage(); - } catch (ParseException e) { - System.err.println(ANSI_RED + e.getMessage()); - System.err.println(); - showUsage(); - } catch (UnsupportedOperationException e) { - System.err.println(ANSI_RED + e.getMessage()); - } catch (Throwable e) { - System.err.println(ANSI_RED + e.getMessage()); - e.printStackTrace(System.err); - } finally { - System.err.print(ANSI_RESET); - } - return 1; - } - - private void run() throws Throwable { - if (!this.outputPath.endsWith("/")) { - this.outputPath += "/"; - } - - // If all statistics requested turn on all statistics - if (this.all) { - this.nodeCount = true; - this.characteristicSets = true; - this.typeCount = true; - this.dataTypeCount = true; - this.namespaceCount = true; - } - - // How many statistics were requested? - int statsRequested = 0; - if (this.nodeCount) - statsRequested++; - if (this.characteristicSets) - statsRequested++; - if (this.typeCount) - statsRequested++; - if (this.dataTypeCount) - statsRequested++; - if (this.namespaceCount) - statsRequested++; - - // Error if no statistics requested - if (statsRequested == 0) { - System.err - .println("You did not request any statistics to be calculated, please use one/more of the relevant options to select the statistics to be computed"); - return; - } - int statsComputed = 1; - - // Compute statistics - if (this.nodeCount) { - Job job = this.selectNodeCountJob(); - statsComputed = this.computeStatistic(job, statsComputed, statsRequested); - } - if (this.typeCount) { - Job[] jobs = this.selectTypeCountJobs(); - statsComputed = this.computeStatistic(jobs, false, false, statsComputed, statsRequested); - } - if (this.dataTypeCount) { - Job job = this.selectDataTypeCountJob(); - statsComputed = this.computeStatistic(job, statsComputed, statsRequested); - } - if (this.namespaceCount) { - Job job = this.selectNamespaceCountJob(); - statsComputed = this.computeStatistic(job, statsComputed, statsRequested); - } - if (this.characteristicSets) { - Job[] jobs = this.selectCharacteristicSetJobs(); - statsComputed = this.computeStatistic(jobs, false, false, statsComputed, statsRequested); - } - } - - private int computeStatistic(Job job, int statsComputed, int statsRequested) throws Throwable { - System.out.println(String.format("Computing Statistic %d of %d requested", statsComputed, statsRequested)); - this.runJob(job); - System.out.println(String.format("Computed Statistic %d of %d requested", statsComputed, statsRequested)); - System.out.println(); - return ++statsComputed; - } - - private int computeStatistic(Job[] jobs, boolean continueOnFailure, boolean continueOnError, int statsComputed, - int statsRequested) { - System.out.println(String.format("Computing Statistic %d of %d requested", statsComputed, statsRequested)); - this.runJobSequence(jobs, continueOnFailure, continueOnError); - System.out.println(String.format("Computed Statistic %d of %d requested", statsComputed, statsRequested)); - System.out.println(); - return ++statsComputed; - } - - private boolean runJob(Job job) throws Throwable { - System.out.println("Submitting Job " + job.getJobName()); - long start = System.nanoTime(); - try { - job.submit(); - if (job.monitorAndPrintJob()) { - System.out.println("Job " + job.getJobName() + " succeeded"); - return true; - } else { - System.out.println("Job " + job.getJobName() + " failed"); - return false; - } - } catch (Throwable e) { - System.out.println("Unexpected failure in Job " + job.getJobName()); - throw e; - } finally { - long end = System.nanoTime(); - System.out.println("Job " + job.getJobName() + " finished after " - + String.format("%,d milliseconds", TimeUnit.NANOSECONDS.toMillis(end - start))); - System.out.println(); - } - } - - private void runJobSequence(Job[] jobs, boolean continueOnFailure, boolean continueOnError) { - for (int i = 0; i < jobs.length; i++) { - Job job = jobs[i]; - try { - boolean success = this.runJob(job); - if (!success && !continueOnFailure) - throw new IllegalStateException("Unable to complete job sequence because Job " + job.getJobName() + " failed"); - } catch (IllegalStateException e) { - throw e; - } catch (Throwable e) { - if (!continueOnError) - throw new IllegalStateException("Unable to complete job sequence because job " + job.getJobName() - + " errorred", e); - } - } - } - - private Job selectNodeCountJob() throws IOException { - String realOutputPath = outputPath + "node-counts/"; - String[] inputs = new String[this.inputPaths.size()]; - this.inputPaths.toArray(inputs); - - if (DATA_TYPE_QUADS.equals(this.inputType)) { - return JobFactory.getQuadNodeCountJob(this.config, inputs, realOutputPath); - } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { - return JobFactory.getTripleNodeCountJob(this.config, inputs, realOutputPath); - } else { - return JobFactory.getNodeCountJob(this.config, inputs, realOutputPath); - } - } - - private Job selectDataTypeCountJob() throws IOException { - String realOutputPath = outputPath + "data-type-counts/"; - String[] inputs = new String[this.inputPaths.size()]; - this.inputPaths.toArray(inputs); - - if (DATA_TYPE_QUADS.equals(this.inputType)) { - return JobFactory.getQuadDataTypeCountJob(this.config, inputs, realOutputPath); - } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { - return JobFactory.getTripleDataTypeCountJob(this.config, inputs, realOutputPath); - } else { - return JobFactory.getDataTypeCountJob(this.config, inputs, realOutputPath); - } - } - - private Job selectNamespaceCountJob() throws IOException { - String realOutputPath = outputPath + "namespace-counts/"; - String[] inputs = new String[this.inputPaths.size()]; - this.inputPaths.toArray(inputs); - - if (DATA_TYPE_QUADS.equals(this.inputType)) { - return JobFactory.getQuadNamespaceCountJob(this.config, inputs, realOutputPath); - } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { - return JobFactory.getTripleNamespaceCountJob(this.config, inputs, realOutputPath); - } else { - return JobFactory.getNamespaceCountJob(this.config, inputs, realOutputPath); - } - } - - private Job[] selectCharacteristicSetJobs() throws IOException { - String intermediateOutputPath = outputPath + "characteristics/intermediate/"; - String finalOutputPath = outputPath + "characteristics/final/"; - String[] inputs = new String[this.inputPaths.size()]; - this.inputPaths.toArray(inputs); - - if (DATA_TYPE_QUADS.equals(this.inputType)) { - return JobFactory.getQuadCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); - } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { - return JobFactory.getTripleCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); - } else { - return JobFactory.getCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); - } - } - - private Job[] selectTypeCountJobs() throws IOException { - String intermediateOutputPath = outputPath + "type-declarations/"; - String finalOutputPath = outputPath + "type-counts/"; - String[] inputs = new String[this.inputPaths.size()]; - this.inputPaths.toArray(inputs); - - if (DATA_TYPE_QUADS.equals(this.inputType)) { - return JobFactory.getQuadTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); - } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { - return JobFactory.getTripleTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); - } else { - return JobFactory.getTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); - } - } -} + +/** + * Entry point for the Hadoop job, handles launching all the relevant Hadoop + * jobs + */ +@Command(name = "bin/hadoop jar PATH_TO_JAR com.yarcdata.urika.hadoop.rdf.stats.RdfStats", description = "A command which computes statistics on RDF data using Hadoop") +public class RdfStats implements Tool { + + static final String ANSI_RED = "\u001B[31m"; + static final String ANSI_RESET = "\u001B[0m"; + + private static final String DATA_TYPE_TRIPLES = "triples", DATA_TYPE_QUADS = "quads", DATA_TYPE_MIXED = "mixed"; + + /** + * Help option + */ + @Inject + public HelpOption helpOption; + + /** + * Gets/Sets whether all available statistics will be calculated + */ + @Option(name = { "-a", "--all" }, description = "Requests that all available statistics be calculated") + public boolean all = false; + + /** + * Gets/Sets whether node usage counts will be calculated + */ + @Option(name = { "-n", "--node-count" }, description = "Requests that node usage counts be calculated") + public boolean nodeCount = false; + + /** + * Gets/Sets whether characteristic sets will be calculated + */ + @Option(name = { "-c", "--characteristic-sets" }, hidden = true, description = "Requests that characteristic sets be calculated (hidden as this has scalability issues)") + public boolean characteristicSets = false; + + /** + * Gets/Sets whether type counts will be calculated + */ + @Option(name = { "-t", "--type-counts" }, description = "Requests that rdf:type usage counts be calculated") + public boolean typeCount = false; + + /** + * Gets/Sets whether data type counts will be calculated + */ + @Option(name = { "-d", "--data-types" }, description = "Requests that literal data type usage counts be calculated") + public boolean dataTypeCount = false; + + /** + * Gets/Sets whether namespace counts will be calculated + */ + @Option(name = { "--namespaces" }, description = "Requests that namespace usage counts be calculated") + public boolean namespaceCount = false; + + @Option(name = { "-g", "--graph-sizes" }, description = "Requests that the size of each named graph be counted") + public boolean graphSize = false; + + /** + * Gets/Sets the input data type used + */ + @Option(name = { "--input-type" }, allowedValues = { DATA_TYPE_MIXED, DATA_TYPE_QUADS, DATA_TYPE_TRIPLES }, description = "Specifies whether the input data is a mixture of quads and triples, just quads or just triples. Using the most specific data type will yield the most accurrate statistics") + public String inputType = DATA_TYPE_MIXED; + + /** + * Gets/Sets the output path + */ + @Option(name = { "-o", "--output" }, title = "OutputPath", description = "Sets the output path", arity = 1, required = true) + public String outputPath = null; + + /** + * Gets/Sets the input path(s) + */ + @Arguments(description = "Sets the input path(s)", title = "InputPath", required = true) + public List<String> inputPaths = new ArrayList<String>(); + + private Configuration config; + + /** + * Entry point method + * + * @param args + * Arguments + * @throws Exception + */ + public static void main(String[] args) throws Exception { + try { + // Run and exit with result code if no errors bubble up + // Note that the exit code may still be a error code + int res = ToolRunner.run(new Configuration(true), new RdfStats(), args); + System.exit(res); + } catch (Exception e) { + System.err.println(ANSI_RED + e.getMessage()); + e.printStackTrace(System.err); + } finally { + System.err.print(ANSI_RESET); + } + // If any errors bubble up exit with non-zero code + System.exit(1); + } + + private static void showUsage() throws IOException { + CommandMetadata metadata = SingleCommand.singleCommand(RdfStats.class).getCommandMetadata(); + System.err.print(ANSI_RESET); + Help.help(metadata, System.err); + System.exit(1); + } + + @Override + public void setConf(Configuration conf) { + this.config = conf; + } + + @Override + public Configuration getConf() { + return this.config; + } + + @Override + public int run(String[] args) throws Exception { + try { + // Parse custom arguments + RdfStats cmd = SingleCommand.singleCommand(RdfStats.class).parse(args); + + // Copy Hadoop configuration across + cmd.setConf(this.getConf()); + + // Show help if requested and exit with success + if (cmd.helpOption.showHelpIfRequested()) { + return 0; + } + + // Run the command and exit with success + cmd.run(); + return 0; + + } catch (ParseOptionMissingException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (ParseOptionMissingValueException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (ParseArgumentsMissingException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (ParseArgumentsUnexpectedException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (ParseOptionIllegalValueException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (ParseException e) { + System.err.println(ANSI_RED + e.getMessage()); + System.err.println(); + showUsage(); + } catch (UnsupportedOperationException e) { + System.err.println(ANSI_RED + e.getMessage()); + } catch (Throwable e) { + System.err.println(ANSI_RED + e.getMessage()); + e.printStackTrace(System.err); + } finally { + System.err.print(ANSI_RESET); + } + return 1; + } + + private void run() throws Throwable { + if (!this.outputPath.endsWith("/")) { + this.outputPath += "/"; + } + + // If all statistics requested turn on all statistics + if (this.all) { + this.nodeCount = true; + this.characteristicSets = true; + this.typeCount = true; + this.dataTypeCount = true; + this.namespaceCount = true; + } + + // How many statistics were requested? + int statsRequested = 0; + if (this.nodeCount) + statsRequested++; + if (this.characteristicSets) + statsRequested++; + if (this.typeCount) + statsRequested++; + if (this.dataTypeCount) + statsRequested++; + if (this.namespaceCount) + statsRequested++; + if (this.graphSize) + statsRequested++; + + // Error if no statistics requested + if (statsRequested == 0) { + System.err + .println("You did not request any statistics to be calculated, please use one/more of the relevant options to select the statistics to be computed"); + return; + } + int statsComputed = 1; + + // Compute statistics + if (this.nodeCount) { + Job job = this.selectNodeCountJob(); + statsComputed = this.computeStatistic(job, statsComputed, statsRequested); + } + if (this.graphSize) { + Job job = this.selectGraphSizeJob(); + statsComputed = this.computeStatistic(job, statsComputed, statsRequested); + } + if (this.typeCount) { + Job[] jobs = this.selectTypeCountJobs(); + statsComputed = this.computeStatistic(jobs, false, false, statsComputed, statsRequested); + } + if (this.dataTypeCount) { + Job job = this.selectDataTypeCountJob(); + statsComputed = this.computeStatistic(job, statsComputed, statsRequested); + } + if (this.namespaceCount) { + Job job = this.selectNamespaceCountJob(); + statsComputed = this.computeStatistic(job, statsComputed, statsRequested); + } + if (this.characteristicSets) { + Job[] jobs = this.selectCharacteristicSetJobs(); + statsComputed = this.computeStatistic(jobs, false, false, statsComputed, statsRequested); + } + } + + private int computeStatistic(Job job, int statsComputed, int statsRequested) throws Throwable { + System.out.println(String.format("Computing Statistic %d of %d requested", statsComputed, statsRequested)); + this.runJob(job); + System.out.println(String.format("Computed Statistic %d of %d requested", statsComputed, statsRequested)); + System.out.println(); + return ++statsComputed; + } + + private int computeStatistic(Job[] jobs, boolean continueOnFailure, boolean continueOnError, int statsComputed, + int statsRequested) { + System.out.println(String.format("Computing Statistic %d of %d requested", statsComputed, statsRequested)); + this.runJobSequence(jobs, continueOnFailure, continueOnError); + System.out.println(String.format("Computed Statistic %d of %d requested", statsComputed, statsRequested)); + System.out.println(); + return ++statsComputed; + } + + private boolean runJob(Job job) throws Throwable { + System.out.println("Submitting Job " + job.getJobName()); + long start = System.nanoTime(); + try { + job.submit(); + if (job.monitorAndPrintJob()) { + System.out.println("Job " + job.getJobName() + " succeeded"); + return true; + } else { + System.out.println("Job " + job.getJobName() + " failed"); + return false; + } + } catch (Throwable e) { + System.out.println("Unexpected failure in Job " + job.getJobName()); + throw e; + } finally { + long end = System.nanoTime(); + System.out.println("Job " + job.getJobName() + " finished after " + + String.format("%,d milliseconds", TimeUnit.NANOSECONDS.toMillis(end - start))); + System.out.println(); + } + } + + private void runJobSequence(Job[] jobs, boolean continueOnFailure, boolean continueOnError) { + for (int i = 0; i < jobs.length; i++) { + Job job = jobs[i]; + try { + boolean success = this.runJob(job); + if (!success && !continueOnFailure) + throw new IllegalStateException("Unable to complete job sequence because Job " + job.getJobName() + + " failed"); + } catch (IllegalStateException e) { + throw e; + } catch (Throwable e) { + if (!continueOnError) + throw new IllegalStateException("Unable to complete job sequence because job " + job.getJobName() + + " errorred", e); + } + } + } + + private Job selectNodeCountJob() throws IOException { + String realOutputPath = outputPath + "node-counts/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadNodeCountJob(this.config, inputs, realOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleNodeCountJob(this.config, inputs, realOutputPath); + } else { + return JobFactory.getNodeCountJob(this.config, inputs, realOutputPath); + } + } + + private Job selectGraphSizeJob() throws IOException { + String realOutputPath = outputPath + "graph-sizes/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadGraphSizesJob(this.config, inputs, realOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleGraphSizesJob(this.config, inputs, realOutputPath); + } else { + return JobFactory.getGraphSizesJob(this.config, inputs, realOutputPath); + } + } + + private Job selectDataTypeCountJob() throws IOException { + String realOutputPath = outputPath + "data-type-counts/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadDataTypeCountJob(this.config, inputs, realOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleDataTypeCountJob(this.config, inputs, realOutputPath); + } else { + return JobFactory.getDataTypeCountJob(this.config, inputs, realOutputPath); + } + } + + private Job selectNamespaceCountJob() throws IOException { + String realOutputPath = outputPath + "namespace-counts/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadNamespaceCountJob(this.config, inputs, realOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleNamespaceCountJob(this.config, inputs, realOutputPath); + } else { + return JobFactory.getNamespaceCountJob(this.config, inputs, realOutputPath); + } + } + + private Job[] selectCharacteristicSetJobs() throws IOException { + String intermediateOutputPath = outputPath + "characteristics/intermediate/"; + String finalOutputPath = outputPath + "characteristics/final/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory + .getQuadCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, + finalOutputPath); + } else { + return JobFactory.getCharacteristicSetJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } + } + + private Job[] selectTypeCountJobs() throws IOException { + String intermediateOutputPath = outputPath + "type-declarations/"; + String finalOutputPath = outputPath + "type-counts/"; + String[] inputs = new String[this.inputPaths.size()]; + this.inputPaths.toArray(inputs); + + if (DATA_TYPE_QUADS.equals(this.inputType)) { + return JobFactory.getQuadTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } else if (DATA_TYPE_TRIPLES.equals(this.inputType)) { + return JobFactory.getTripleTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } else { + return JobFactory.getTypeCountJobs(this.config, inputs, intermediateOutputPath, finalOutputPath); + } + } +} http://git-wip-us.apache.org/repos/asf/jena/blob/5a49ce8d/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java ---------------------------------------------------------------------- diff --git a/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java b/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java index 55bb8af..7935335 100644 --- a/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java +++ b/jena-elephas/jena-elephas-stats/src/main/java/org/apache/jena/hadoop/rdf/stats/jobs/JobFactory.java @@ -28,6 +28,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.chain.ChainMapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; @@ -56,12 +57,14 @@ import org.apache.jena.hadoop.rdf.mapreduce.count.datatypes.QuadDataTypeCountMap import org.apache.jena.hadoop.rdf.mapreduce.count.datatypes.TripleDataTypeCountMapper; import org.apache.jena.hadoop.rdf.mapreduce.count.namespaces.QuadNamespaceCountMapper; import org.apache.jena.hadoop.rdf.mapreduce.count.namespaces.TripleNamespaceCountMapper; +import org.apache.jena.hadoop.rdf.mapreduce.count.positional.QuadGraphCountMapper; import org.apache.jena.hadoop.rdf.mapreduce.count.positional.QuadObjectCountMapper; import org.apache.jena.hadoop.rdf.mapreduce.count.positional.TripleObjectCountMapper; import org.apache.jena.hadoop.rdf.mapreduce.filter.positional.QuadFilterByPredicateMapper; import org.apache.jena.hadoop.rdf.mapreduce.filter.positional.TripleFilterByPredicateUriMapper; import org.apache.jena.hadoop.rdf.mapreduce.group.QuadGroupBySubjectMapper; import org.apache.jena.hadoop.rdf.mapreduce.group.TripleGroupBySubjectMapper; +import org.apache.jena.hadoop.rdf.mapreduce.transform.TriplesToQuadsConstantGraphMapper; import org.apache.jena.hadoop.rdf.types.CharacteristicSetWritable; import org.apache.jena.hadoop.rdf.types.NodeWritable; import org.apache.jena.hadoop.rdf.types.QuadWritable; @@ -178,6 +181,67 @@ public class JobFactory { FileOutputFormat.setOutputPath(job, new Path(outputPath)); return job; + } + + public static Job getTripleGraphSizesJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Triples Graph Sizes"); + + // Map/Reduce classes + ChainMapper.addMapper(job, TriplesToQuadsConstantGraphMapper.class, LongWritable.class, TripleWritable.class, LongWritable.class, QuadWritable.class, config); + ChainMapper.addMapper(job, QuadGraphCountMapper.class, LongWritable.class, QuadWritable.class, NodeWritable.class, LongWritable.class, config); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(TriplesInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + public static Job getQuadGraphSizesJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Quads Graph Sizes"); + + // Map/Reduce classes + job.setMapperClass(QuadGraphCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(QuadsInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; + } + + public static Job getGraphSizesJob(Configuration config, String[] inputPaths, String outputPath) throws IOException { + Job job = Job.getInstance(config); + job.setJarByClass(JobFactory.class); + job.setJobName("RDF Graph Sizes"); + + // Map/Reduce classes + job.setMapperClass(QuadGraphCountMapper.class); + job.setMapOutputKeyClass(NodeWritable.class); + job.setMapOutputValueClass(LongWritable.class); + job.setReducerClass(NodeCountReducer.class); + + // Input and Output + job.setInputFormatClass(TriplesOrQuadsInputFormat.class); + job.setOutputFormatClass(NTriplesNodeOutputFormat.class); + FileInputFormat.setInputPaths(job, StringUtils.arrayToString(inputPaths)); + FileOutputFormat.setOutputPath(job, new Path(outputPath)); + + return job; } /** http://git-wip-us.apache.org/repos/asf/jena/blob/5a49ce8d/jena-elephas/pom.xml ---------------------------------------------------------------------- diff --git a/jena-elephas/pom.xml b/jena-elephas/pom.xml index 5245065..040fc9c 100644 --- a/jena-elephas/pom.xml +++ b/jena-elephas/pom.xml @@ -38,6 +38,7 @@ <arq.version>2.12.2-SNAPSHOT</arq.version> <junit.version>4.11</junit.version> <mrunit.version>1.0.0</mrunit.version> + <airline.version>0.8</airline.version> </properties> <!-- Profiles to allow building for different Hadoop versions --> @@ -79,6 +80,13 @@ <artifactId>jena-arq</artifactId> <version>${arq.version}</version> </dependency> + + <!-- CLI Related Dependencies --> + <dependency> + <groupId>com.github.rvesse</groupId> + <artifactId>airline</artifactId> + <version>${airline.version}</version> + </dependency> <!-- Test Dependencies --> <dependency>
