http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/LoadTriplesCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/LoadTriplesCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/LoadTriplesCommand.java new file mode 100644 index 0000000..c24eb66 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/LoadTriplesCommand.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client.command; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.commons.io.FilenameUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; +import org.apache.rya.indexing.pcj.fluo.client.util.FluoLoader; +import org.openrdf.rio.RDFHandlerException; +import org.openrdf.rio.RDFParseException; +import org.openrdf.rio.RDFParser; +import org.openrdf.rio.ntriples.NTriplesParser; +import org.openrdf.rio.trig.TriGParser; +import org.openrdf.rio.turtle.TurtleParser; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +import io.fluo.api.client.FluoClient; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * A command that loads the contents of an NTriple file into the Fluo application. + */ +@ParametersAreNonnullByDefault +public class LoadTriplesCommand implements PcjAdminClientCommand { + private static final Logger log = LogManager.getLogger(LoadTriplesCommand.class); + + /** + * Command line parameters that are used by this command to configure itself. + */ + private static final class Parameters { + @Parameter(names = "--triplesFile", required = true, description = "The RDF file of statemetns to load into the Fluo app.") + private String nTriplesFile; + } + + @Override + public String getCommand() { + return "load-triples"; + } + + @Override + public String getDescription() { + return "Load RDF Triples into the Fluo app"; + } + + @Override + public String getUsage() { + final JCommander parser = new JCommander(new Parameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + @Override + public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException { + checkNotNull(accumulo); + checkNotNull(fluo); + checkNotNull(args); + + log.trace("Executing the Load Triples Command..."); + + // Parse the command line arguments. + final Parameters params = new Parameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not load the Triples file because of invalid command line parameters.", e); + } + + // Iterate over the Statements that are in the input file and write them to Fluo. + log.trace("Loading RDF Statements from the Triples file '" + params.nTriplesFile + "'."); + final Path triplesPath = Paths.get( params.nTriplesFile ); + + try { + final RDFParser parser = makeParser(triplesPath); + final FluoLoader loader = new FluoLoader(fluo, new InsertTriples()); + parser.setRDFHandler(loader); + parser.parse(Files.newInputStream(triplesPath), triplesPath.toUri().toString()); + } catch (UnsupportedFormatException | RDFParseException | RDFHandlerException | IOException e) { + throw new ExecutionException("Could not load the RDF file into the Fluo app.", e); + } + + log.trace("Finished executing the Load Triples Command."); + } + + private static RDFParser makeParser(final Path tripleFile) throws UnsupportedFormatException { + checkNotNull(tripleFile); + + final String extension = FilenameUtils.getExtension( tripleFile.getFileName().toString() ); + switch(extension) { + case "nt": + return new NTriplesParser(); + case "ttl": + return new TurtleParser(); + case "trig": + return new TriGParser(); + default: + throw new UnsupportedFormatException("RDF File with extension '" + extension + "' not supported."); + } + } + + private static final class UnsupportedFormatException extends Exception { + private static final long serialVersionUID = 1L; + + public UnsupportedFormatException(final String message) { + super(message); + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java new file mode 100644 index 0000000..1e15fc8 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client.command; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.commons.io.IOUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; +import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.sail.SailException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +import io.fluo.api.client.FluoClient; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * A command that creates a creates a new PCJ in the Fluo app and loads historic + * statement pattern matches for it. + */ +@ParametersAreNonnullByDefault +public class NewQueryCommand implements PcjAdminClientCommand { + private static final Logger log = LogManager.getLogger(NewQueryCommand.class); + + /** + * Command line parameters that are used by this command to configure itself. + */ + private static final class Parameters { + @Parameter(names = "--queryRequestFile", required = true, description = "The path to a file containing the SPARQL query that will be loaded into the Fluo app.") + private String queryRequestFile; + } + + @Override + public String getCommand() { + return "new-query"; + } + + @Override + public String getDescription() { + return "Add a SPARQL query to the Fluo app"; + } + + @Override + public String getUsage() { + final JCommander parser = new JCommander(new Parameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + @Override + public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException { + checkNotNull(accumulo); + checkNotNull(fluo); + checkNotNull(args); + + log.trace("Executing the New Query Command..."); + + // Parse the command line arguments. + final Parameters params = new Parameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not create a new query because of invalid command line parameters.", e); + } + + // Load the request from the file into memory. + log.trace("Loading the query found in file '" + params.queryRequestFile + "' into the client app."); + ParsedQueryRequest request = null; + try { + final Path requestFile = Paths.get(params.queryRequestFile); + final String requestText = IOUtils.toString( Files.newInputStream(requestFile) ); + request = ParsedQueryRequest.parse(requestText); + } catch (final IOException e) { + throw new ExecutionException("Could not load the query request into memory.", e); + } + + // Load the query into the Fluo app. + log.trace("SPARQL Query: " + request.getQuery()); + log.trace("Var Orders: " + request.getVarOrders()); + log.trace("Loading these values into the Fluo app."); + final CreatePcj createPcj = new CreatePcj(); + try { + createPcj.withRyaIntegration(fluo, ryaTablePrefix, rya, accumulo, request.getVarOrders(), request.getQuery()); + } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { + throw new ExecutionException("Could not create and load historic matches into the the Fluo app for the query.", e); + } + + log.trace("Finished executing the New Query Command."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java new file mode 100644 index 0000000..9a8c7f7 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/QueryReportCommand.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client.command; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.accumulo.core.client.Connector; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; +import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport; +import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; +import org.apache.rya.indexing.pcj.fluo.client.util.QueryReportRenderer; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +import io.fluo.api.client.FluoClient; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * TODO implement this. + */ +public class QueryReportCommand implements PcjAdminClientCommand { + private static final Logger log = LogManager.getLogger(NewQueryCommand.class); + + /** + * Command line parameters that are used by this command to configure itself. + */ + private static final class Parameters { + @Parameter(names = "--queryId", required = true, description = "The Query ID used to build the report.") + private String queryId; + } + + @Override + public String getCommand() { + return "query-report"; + } + + @Override + public String getDescription() { + return "Build a report that indicates a query's structure within the Fluo app as well as how many Binding Sets have been emitted for each of its nodes."; + } + + @Override + public String getUsage() { + final JCommander parser = new JCommander(new Parameters()); + + final StringBuilder usage = new StringBuilder(); + parser.usage(usage); + return usage.toString(); + } + + @Override + public void execute(final Connector accumulo, final String ryaTablePrefix, final RyaSailRepository rya, final FluoClient fluo, final String[] args) throws ArgumentsException, ExecutionException { + checkNotNull(accumulo); + checkNotNull(ryaTablePrefix); + checkNotNull(rya); + checkNotNull(fluo); + checkNotNull(args); + + log.trace("Executing the Get Query Report Command..."); + + // Parse the command line arguments. + final Parameters params = new Parameters(); + try { + new JCommander(params, args); + } catch(final ParameterException e) { + throw new ArgumentsException("Could not create a new query because of invalid command line parameters.", e); + } + + // Build the report using what is stored in Fluo. + log.trace("Building the report for Query ID: " + params.queryId); + final QueryReport queryReport = new GetQueryReport().getReport(fluo, params.queryId); + log.trace("Report built."); + + // Format and print the report. + try { + final String reportString = new QueryReportRenderer().render(queryReport); + System.out.println(reportString); + } catch(final Exception e) { + throw new ExecutionException("Unable to render the query metadata report for output.", e); + } + + log.trace("Finished executing the Get Query Report Command."); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java new file mode 100644 index 0000000..3168a71 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/FluoLoader.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client.util; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.ArrayList; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.openrdf.model.Statement; +import org.openrdf.rio.RDFHandlerException; +import org.openrdf.rio.RDFParser; +import org.openrdf.rio.helpers.RDFHandlerBase; + +import io.fluo.api.client.FluoClient; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RdfToRyaConversions; + +/** + * When used as the handler of an {@link RDFParser}, instances of this class + * will batch load {@link Statement}s into the Fluo app 1000 statements at a time. + */ +public class FluoLoader extends RDFHandlerBase { + private static final Logger log = LogManager.getLogger(FluoLoader.class); + + private static final int FLUSH_SIZE = 1000; + private final ArrayList<RyaStatement> buff = new ArrayList<>(1000); + + private final FluoClient fluoClient; + private final InsertTriples insertTriples; + + /** + * Constructs an instance of {@link FluoLoader}. + * + * @param fluoClient - The client that will be used to connect to Fluo. (not null) + * @param insertTriples - The interactor that loads triples into a Fluo table. (not null) + */ + public FluoLoader(final FluoClient fluoClient, final InsertTriples insertTriples) { + this.fluoClient = checkNotNull(fluoClient); + this.insertTriples = checkNotNull(insertTriples); + } + + @Override + public void startRDF() throws RDFHandlerException { + log.trace("Start of RDF file encountered."); + } + + @Override + public void handleStatement(final Statement st) throws RDFHandlerException { + // If the buffer is full, flush it to the Fluo table. + if(buff.size() == FLUSH_SIZE) { + log.trace("Flushing " + buff.size() + " Statements from the buffer to Fluo."); + insertTriples.insert(fluoClient, buff); + buff.clear(); + } + + // Enqueue the statement for the next job. + final RyaStatement ryaSt = RdfToRyaConversions.convertStatement(st); + buff.add( ryaSt ); + } + + @Override + public void endRDF() throws RDFHandlerException { + log.trace("End of RDF file encountered."); + + if(!buff.isEmpty()) { + log.trace("Flushing the last " + buff.size() + " Statements from the buffer to Fluo."); + insertTriples.insert(fluoClient, buff); + buff.clear(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/ParsedQueryRequest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/ParsedQueryRequest.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/ParsedQueryRequest.java new file mode 100644 index 0000000..252c875 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/ParsedQueryRequest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client.util; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.apache.commons.lang3.builder.EqualsBuilder; + +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Represents a request to create a new PCJ in the Fluo app. + */ +@Immutable +@ParametersAreNonnullByDefault +public class ParsedQueryRequest { + + private final String sparql; + private final Set<VariableOrder> varOrders; + + /** + * Constructs an instance of {@link CratePcjRequest}. + * + * @param sparql - The SPARQL query to load into the Fluo app. (not null) + * @param varOrders - The variable orders to export the query's results to. (not null) + */ + public ParsedQueryRequest(final String sparql, final Set<VariableOrder> varOrders) { + this.sparql = checkNotNull(sparql); + this.varOrders = checkNotNull(varOrders); + } + + /** + * @return The variable orders to export the query's results to. (not null) + */ + public Set<VariableOrder> getVarOrders() { + return varOrders; + } + + /** + * @return The SPARQL query to load into the Fluo app. + */ + public String getQuery() { + return sparql; + } + + @Override + public int hashCode() { + return Objects.hash(sparql, varOrders); + } + + @Override + public boolean equals(final Object o) { + if(this == o) { + return true; + } + if(o instanceof ParsedQueryRequest) { + final ParsedQueryRequest request = (ParsedQueryRequest)o; + return new EqualsBuilder() + .append(sparql, request.sparql) + .append(varOrders, request.varOrders) + .isEquals(); + } + return false; + } + + private static final Pattern varOrdersPattern = Pattern.compile("^#prefix (.*?)$", Pattern.MULTILINE); + + /** + * Create a {@link ParsedQueryRequest} from formatted text. + * <p> + * The file may contain a list of variable orders at the head as comments. + * The file must contain a SPARQL query to loAd into the application. + * <p> + * Example file: + * <pre> + * #prefix a, b, c + * #prefix b, c, a + * SELECT * + * WHERE { + * ?a <http://talksTo> ?b. + * ?b <http://talksTo> ?c. + * } + * </pre> + * @throws IOException + */ + public static ParsedQueryRequest parse(final String requestText) throws IOException { + checkNotNull(requestText); + + // Shift this pointer to the end of each match to find where the SPARQL starts. + int startOfSparql = 0; + + // Scan for variable orders. + final Set<VariableOrder> varOrders = new HashSet<>(); + + final Matcher matcher = varOrdersPattern.matcher(requestText); + while(matcher.find()) { + final String varOrder = matcher.group(1); + varOrders.add( new VariableOrder( varOrder.split(",\\s*") ) ); + + startOfSparql = matcher.end(); + } + + // Pull the SPARQL out. + final String sparql = requestText.substring(startOfSparql).trim(); + + return new ParsedQueryRequest(sparql, varOrders); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/PcjMetadataRenderer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/PcjMetadataRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/PcjMetadataRenderer.java new file mode 100644 index 0000000..50929bc --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/PcjMetadataRenderer.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client.util; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.text.NumberFormat; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rya.indexing.pcj.fluo.client.util.Report.ReportItem; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; + +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Pretty renders the state of a query's {@link PcjMetadata}. + */ +public class PcjMetadataRenderer { + + /** + * Pretty render the state of a PCJ. + * + * @param queryId - The ID of the query the metadata is from. (not null) + * @param metadata - Metadata about one of the PCJs. (not null) + * @return A pretty render of a PCJ's state. + * @throws Exception The SPARQL within the metadata could not be parsed or + * it could not be rendered. + */ + public String render(final String queryId, final PcjMetadata metadata) throws Exception { + checkNotNull(metadata); + + // Pretty format the cardinality. + final String cardinality = NumberFormat.getInstance().format( metadata.getCardinality() ); + + // Pretty format and split the SPARQL query into lines. + final SPARQLParser parser = new SPARQLParser(); + final SPARQLQueryRenderer renderer = new SPARQLQueryRenderer(); + final ParsedQuery pq = parser.parseQuery(metadata.getSparql(), null); + final String prettySparql = renderer.render(pq); + final String[] sparqlLines = StringUtils.split(prettySparql, '\n'); + + // Split the variable orders into lines. + final String[] varOrderLines = new String[ metadata.getVarOrders().size() ]; + int i = 0; + for(final VariableOrder varOrder : metadata.getVarOrders()) { + varOrderLines[i++] = varOrder.toString(); + } + + // Create the report. + final Report.Builder builder = Report.builder(); + builder.appendItem( new ReportItem("Query ID", queryId) ); + builder.appendItem( new ReportItem("Cardinality", cardinality) ); + builder.appendItem( new ReportItem("Export Variable Orders", varOrderLines)); + builder.appendItem( new ReportItem("SPARQL", sparqlLines) ); + return builder.build().toString(); + } + + /** + * Pretty render the state of many PCJs. + * + * @param metadata - The PCJ states to render. The key within the map is + * the statem's Query Id. (not null) + * @return A pretty render of a PCJs' state. + * @throws Exception A SPARQL within the metadata could not be parsed or + * it could not be rendered. + */ + public String render(final Map<String, PcjMetadata> metadata) throws Exception { + checkNotNull(metadata); + + final StringBuilder formatted = new StringBuilder(); + + for(final Entry<String, PcjMetadata> entry : metadata.entrySet()) { + final String formattedQuery = render(entry.getKey(), entry.getValue()); + formatted.append(formattedQuery).append("\n"); + } + + return formatted.toString(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java new file mode 100644 index 0000000..2b5bd8a --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client.util; + +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.commons.lang3.StringUtils; +import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.apache.rya.indexing.pcj.fluo.client.util.Report.ReportItem; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; + +/** + * Pretty renders a {@link QueryReport}. + */ +@ParametersAreNonnullByDefault +public class QueryReportRenderer { + + /** + * Pretty render a {@link QueryReport}. + * + * @param queryReport - The report that will be rendered. (not null) + * @return A pretty render of the report. + * @throws Exception Indicates the SPARQL could not be rendered for some reason. + */ + public String render(final QueryReport queryReport) throws Exception { + checkNotNull(queryReport); + + final Report.Builder builder = Report.builder(); + + final FluoQuery metadata = queryReport.getFluoQuery(); + + final QueryMetadata queryMetadata = metadata.getQueryMetadata(); + builder.appendItem( new ReportItem("QUERY NODE") ); + builder.appendItem( new ReportItem("Node ID", queryMetadata.getNodeId()) ); + builder.appendItem( new ReportItem("Variable Order", queryMetadata.getVariableOrder().toString()) ); + builder.appendItem( new ReportItem("SPARQL", prettyFormatSparql( queryMetadata.getSparql()) ) ); + builder.appendItem( new ReportItem("Child Node ID", queryMetadata.getChildNodeId()) ); + builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(queryMetadata.getNodeId())) ); + + for(final FilterMetadata filterMetadata : metadata.getFilterMetadata()) { + builder.appendItem( new ReportItem("") ); + + builder.appendItem( new ReportItem("FILTER NODE") ); + builder.appendItem( new ReportItem("Node ID", filterMetadata.getNodeId()) ); + builder.appendItem( new ReportItem("Variable Order", filterMetadata.getVariableOrder().toString()) ); + builder.appendItem( new ReportItem("Original SPARQL", prettyFormatSparql( filterMetadata.getOriginalSparql()) ) ); + builder.appendItem( new ReportItem("Filter Index", "" + filterMetadata.getFilterIndexWithinSparql()) ); + builder.appendItem( new ReportItem("Parent Node ID", filterMetadata.getParentNodeId()) ); + builder.appendItem( new ReportItem("Child Node ID", filterMetadata.getChildNodeId()) ); + builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(filterMetadata.getNodeId())) ); + } + + for(final JoinMetadata joinMetadata : metadata.getJoinMetadata()) { + builder.appendItem( new ReportItem("") ); + + builder.appendItem( new ReportItem("JOIN NODE") ); + builder.appendItem( new ReportItem("Node ID", joinMetadata.getNodeId()) ); + builder.appendItem( new ReportItem("Variable Order", joinMetadata.getVariableOrder().toString()) ); + builder.appendItem( new ReportItem("Parent Node ID", joinMetadata.getParentNodeId()) ); + builder.appendItem( new ReportItem("Left Child Node ID", joinMetadata.getLeftChildNodeId()) ); + builder.appendItem( new ReportItem("Right Child Node ID", joinMetadata.getRightChildNodeId()) ); + builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(joinMetadata.getNodeId())) ); + } + + for(final StatementPatternMetadata spMetadata : metadata.getStatementPatternMetadata()) { + builder.appendItem( new ReportItem("") ); + + builder.appendItem( new ReportItem("STATEMENT PATTERN NODE") ); + builder.appendItem( new ReportItem("Node ID", spMetadata.getNodeId()) ); + builder.appendItem( new ReportItem("Variable Order", spMetadata.getVariableOrder().toString()) ); + builder.appendItem( new ReportItem("Statement Pattern", spMetadata.getStatementPattern()) ); + builder.appendItem( new ReportItem("Parent Node ID", spMetadata.getParentNodeId()) ); + builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(spMetadata.getNodeId())) ); + } + + return builder.build().toString(); + } + + private String[] prettyFormatSparql(final String sparql) throws Exception { + final SPARQLParser parser = new SPARQLParser(); + final SPARQLQueryRenderer renderer = new SPARQLQueryRenderer(); + final ParsedQuery pq = parser.parseQuery(sparql, null); + final String prettySparql = renderer.render(pq); + final String[] sparqlLines = StringUtils.split(prettySparql, '\n'); + return sparqlLines; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/Report.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/Report.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/Report.java new file mode 100644 index 0000000..cdd51f8 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/Report.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client.util; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.collect.ImmutableList; + +/** + * A human readable report that displays the title of a {@link ReportItem} on the + * left hand side of the table and the value of the item on the right hand side. + * If an item does not have any values, then it prints an empty line. + */ +@Immutable +@ParametersAreNonnullByDefault +public class Report { + + private final ImmutableList<ReportItem> items; + private final int maxTitleLength; + private final int maxValueLineLength; + + /** + * Use an instance of {@link Report.Builder} to construct instances of this class. + * + * @param items - An ordered list of items that appear in the report. (not null) + * @param maxTitleLength - The length of the longest title in the report. (> 0) + * @param maxValueLineLength - The length of the longest value line in the report. (> 0) + */ + private Report( + final ImmutableList<ReportItem> items, + final int maxTitleLength, + final int maxValueLineLength) { + this.items = checkNotNull(items); + checkArgument(maxTitleLength > 0); + this.maxTitleLength = maxTitleLength; + checkArgument(maxValueLineLength > 0); + this.maxValueLineLength = maxValueLineLength; + } + + @Override + public String toString() { + // Figure out how long each line will be. + final int lineLength = "| ".length() + maxTitleLength + " | ".length() + maxValueLineLength + " |".length(); + + // Format that may be used to write each line. + final String lineFormat = "| %-" + maxTitleLength + "s | %-" + maxValueLineLength + "s |\n"; + + // The line that is used as the first and last line of the report. + final String dashLine = StringUtils.repeat("-", lineLength); + + // Build the String verison of the report. + final StringBuilder builder = new StringBuilder(); + builder.append(dashLine).append("\n"); + + for(final ReportItem item : items) { + final String[] valueLines = item.getValueLines(); + switch(valueLines.length) { + case 0: + // Write an empty value cell. + builder.append( String.format(lineFormat, item.getTitle(), "") ); + break; + + case 1: + // Write the value cell. + builder.append( String.format(lineFormat, item.getTitle(), valueLines[0]) ); + break; + + default: + builder.append( String.format(lineFormat, item.getTitle(), valueLines[0]) ); + for(int i = 1; i < valueLines.length; i++) { + builder.append( String.format(lineFormat, "", valueLines[i]) ); + } + break; + } + } + + builder.append(dashLine).append("\n"); + return builder.toString(); + } + + /** + * @return Creates a new {@link Report.Builder}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * An item that may appear within a {@link Report}. Each item has a title + * that briefly describes the value it holds. + */ + @Immutable + @ParametersAreNonnullByDefault + public static final class ReportItem { + private final String title; + private final String[] valueLines; + + /** + * Constructs an instance of {@link ReportItem} when there is no value + * associated with the title. + * + * @param title - Describes the item's value. (not null) + */ + public ReportItem(final String title) { + this.title = checkNotNull(title); + this.valueLines = new String[0]; + } + + /** + * Constructs an instance of {@link ReportItem} when the value section + * is contained within a single line. + * + * @param title - Describes the item's value. (not null) + * @param valueLine - The line that will appear within the value section. (not null) + */ + public ReportItem(final String title, final String valueLine) { + this.title = checkNotNull(title); + checkNotNull(valueLine); + this.valueLines = new String[]{ valueLine }; + } + + /** + * Constructs an instance of {@link ReportItem} when the value section + * spans many lines. + * + * @param title - Describes the item's value. (not null) + * @param valueLines - The value section broken into lines as they + * will appear within the report. (not null) + */ + public ReportItem(final String title, final String[] valueLines) { + this.title = checkNotNull(title); + this.valueLines = checkNotNull(valueLines); + } + + /** + * @return Describes the item's value. + */ + public String getTitle() { + return title; + } + + /** + * @return The value section broken into lines as they will appear within the report. + */ + public String[] getValueLines() { + return valueLines; + } + } + + /** + * Builds instances of {@link Report}. + */ + @ParametersAreNonnullByDefault + public static final class Builder { + + private final ImmutableList.Builder<ReportItem> lines = ImmutableList.builder(); + private int maxTitleLength = 0; + private int maxValueLineLength = 0; + + /** + * Append a {@link ReportItem} to the end of the {@link Report}. + * + * @param item - The next item that will appear in the report. (not null) + * @return This builder so that method invocations may be chained. + */ + public Builder appendItem(final ReportItem item) { + checkNotNull(item); + lines.add(item); + + final int titleLength = item.getTitle().length(); + if(maxTitleLength < titleLength) { + this.maxTitleLength = titleLength; + } + + for(final String valueLine : item.getValueLines()) { + final int valueLineLength = valueLine.length(); + if(maxValueLineLength < valueLineLength) { + this.maxValueLineLength = valueLineLength; + } + } + + return this; + } + + /** + * @return An instance of {@link Report} using the state of the builder. + */ + public Report build() { + return new Report( lines.build(), maxTitleLength, maxValueLineLength); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/ParsedQueryRequestTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/ParsedQueryRequestTest.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/ParsedQueryRequestTest.java new file mode 100644 index 0000000..432cf8a --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/ParsedQueryRequestTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.HashSet; + +import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest; +import org.junit.Test; + +import com.google.common.collect.Sets; + +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Tests the methods of {@link ParsedQueryRequest}. + */ +public class ParsedQueryRequestTest { + + @Test + public void parseNoVarOrders() throws IOException { + final String requestText = + "SELECT * \n"+ + "WHERE { \n" + + " ?a <http://talksTo> ?b. \n" + + " ?b <http://talksTo> ?c. \n" + + "}"; + + final ParsedQueryRequest expected = new ParsedQueryRequest( + "SELECT * \n"+ + "WHERE { \n" + + " ?a <http://talksTo> ?b. \n" + + " ?b <http://talksTo> ?c. \n" + + "}", + new HashSet<VariableOrder>()); + + final ParsedQueryRequest request = ParsedQueryRequest.parse(requestText); + assertEquals(expected, request); + } + + @Test + public void parseHasVarOrders() throws IOException { + final String requestText = + "#prefix a, b,c\n" + + "#prefix b, c, a\n" + + "SELECT * \n"+ + "WHERE { \n" + + " ?a <http://talksTo> ?b. \n" + + " ?b <http://talksTo> ?c. \n" + + "}"; + + final ParsedQueryRequest expected = new ParsedQueryRequest( + "SELECT * \n"+ + "WHERE { \n" + + " ?a <http://talksTo> ?b. \n" + + " ?b <http://talksTo> ?c. \n" + + "}", + Sets.newHashSet( + new VariableOrder("a", "b", "c"), + new VariableOrder("b", "c", "a"))); + + final ParsedQueryRequest request = ParsedQueryRequest.parse(requestText); + assertEquals(expected, request); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/PcjMetadataRendererTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/PcjMetadataRendererTest.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/PcjMetadataRendererTest.java new file mode 100644 index 0000000..a200039 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/PcjMetadataRendererTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client; + +import static org.junit.Assert.assertEquals; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.rya.indexing.pcj.fluo.client.util.PcjMetadataRenderer; +import org.junit.Test; + +import com.google.common.collect.Sets; + +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Tests the methods of {@link PcjMetadataRenderer}. + */ +public class PcjMetadataRendererTest { + + @Test + public void formatSingleMetadata() throws Exception { + // Create the PcjMetadata that will be formatted as a report. + final PcjMetadata metadata = new PcjMetadata( + "SELECT ?x ?y " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?y <http://worksAt> <http://Chipotle>." + + "}", + 12233423L, + Sets.<VariableOrder>newHashSet( + new VariableOrder("x", "y"), + new VariableOrder("y", "x"))); + + // Run the test. + final String expected = + "---------------------------------------------------------------------\n" + + "| Query ID | query1 |\n" + + "| Cardinality | 12,233,423 |\n" + + "| Export Variable Orders | y;x |\n" + + "| | x;y |\n" + + "| SPARQL | select ?x ?y |\n" + + "| | where { |\n" + + "| | ?x <http://talksTo> <http://Eve>. |\n" + + "| | ?y <http://worksAt> <http://Chipotle>. |\n" + + "| | } |\n" + + "---------------------------------------------------------------------\n"; + + final PcjMetadataRenderer formatter = new PcjMetadataRenderer(); + assertEquals(expected, formatter.render("query1", metadata)); + } + + @Test + public void formatManyMetdata() throws Exception { + // Create the PcjMetadata that will be formatted as a report. + final PcjMetadata metadata1 = new PcjMetadata( + "SELECT ?x ?y " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?y <http://worksAt> <http://Chipotle>." + + "}", + 12233423L, + Sets.<VariableOrder>newHashSet( + new VariableOrder("x", "y"), + new VariableOrder("y", "x"))); + + final PcjMetadata metadata2 = new PcjMetadata( + "SELECT ?x " + + "WHERE { " + + "?x <http://likes> <http://cookies>" + + "}", + 2342L, + Sets.<VariableOrder>newHashSet(new VariableOrder("x"))); + + final Map<String, PcjMetadata> metadata = new LinkedHashMap<>(); + metadata.put("query1", metadata1); + metadata.put("query2", metadata2); + + // Run the test. + final String expected = + "---------------------------------------------------------------------\n" + + "| Query ID | query1 |\n" + + "| Cardinality | 12,233,423 |\n" + + "| Export Variable Orders | y;x |\n" + + "| | x;y |\n" + + "| SPARQL | select ?x ?y |\n" + + "| | where { |\n" + + "| | ?x <http://talksTo> <http://Eve>. |\n" + + "| | ?y <http://worksAt> <http://Chipotle>. |\n" + + "| | } |\n" + + "---------------------------------------------------------------------\n" + + "\n" + + "------------------------------------------------------------------\n" + + "| Query ID | query2 |\n" + + "| Cardinality | 2,342 |\n" + + "| Export Variable Orders | x |\n" + + "| SPARQL | select ?x |\n" + + "| | where { |\n" + + "| | ?x <http://likes> <http://cookies>. |\n" + + "| | } |\n" + + "------------------------------------------------------------------\n" + + "\n"; + + final PcjMetadataRenderer formatter = new PcjMetadataRenderer(); + assertEquals(expected, formatter.render(metadata)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/ReportTests.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/ReportTests.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/ReportTests.java new file mode 100644 index 0000000..25e21e0 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/test/java/org/apache/rya/indexing/pcj/fluo/client/ReportTests.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.client; + +import static org.junit.Assert.assertEquals; + +import org.apache.rya.indexing.pcj.fluo.client.util.Report; +import org.apache.rya.indexing.pcj.fluo.client.util.Report.ReportItem; +import org.junit.Test; + +/** + * Tests the methods of {@link Report}. + */ +public class ReportTests { + + @Test + public void singleLineValues() { + final Report.Builder builder = Report.builder(); + builder.appendItem(new ReportItem("Title 1", new String[]{"Short value."})); + builder.appendItem(new ReportItem("Title 2", new String[]{"This is the longest values that appears in the report."})); + builder.appendItem(new ReportItem("This is a long title", new String[]{"Short value."})); + final Report report = builder.build(); + + final String expected = + "---------------------------------------------------------------------------------\n" + + "| Title 1 | Short value. |\n" + + "| Title 2 | This is the longest values that appears in the report. |\n" + + "| This is a long title | Short value. |\n" + + "---------------------------------------------------------------------------------\n"; + + assertEquals(expected, report.toString()); + } + + @Test + public void emptyValues() { + final Report.Builder builder = Report.builder(); + builder.appendItem(new ReportItem("No Value Here", new String[]{})); + builder.appendItem(new ReportItem("Value Here", new String[]{"This one has a value."})); + final Report report = builder.build(); + + final String expected = + "-----------------------------------------\n" + + "| No Value Here | |\n" + + "| Value Here | This one has a value. |\n" + + "-----------------------------------------\n"; + assertEquals(expected, report.toString()); + } + + @Test + public void multiLineValues() { + final Report.Builder builder = Report.builder(); + builder.appendItem(new ReportItem("Title 1", new String[]{"Value 1"})); + builder.appendItem(new ReportItem("Multiple Lines", new String[]{"This is the first line.", "This is the second line."})); + builder.appendItem(new ReportItem("Title 2", new String[]{"Value 2"})); + final Report report = builder.build(); + + final String expected = + "---------------------------------------------\n" + + "| Title 1 | Value 1 |\n" + + "| Multiple Lines | This is the first line. |\n" + + "| | This is the second line. |\n" + + "| Title 2 | Value 2 |\n" + + "---------------------------------------------\n"; + assertEquals(expected, report.toString()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml new file mode 100644 index 0000000..97a967b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml @@ -0,0 +1,85 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.parent</artifactId> + <version>3.2.10-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.pcj.fluo.demo</artifactId> + + <name>Apache Rya PCJ Fluo Demo</name> + <description> + A demo application that shows how the Fluo application is able to maintain + Rya Precomputed Joins using historic Statements while streaming new Statements. + </description> + + <dependencies> + <!-- Rya Runtime Dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.api</artifactId> + </dependency> + + <!-- 3rd Party Runtime Dependencies. --> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-queryrender</artifactId> + </dependency> + <dependency> + <groupId>io.fluo</groupId> + <artifactId>fluo-mini</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Use the pre-build 'jar-with-dependencies' assembly to package the dependent class files into the final jar. + This creates a jar file that can be deployed to Fluo without having to include any dependent jars. --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>org.apache.rya.indexing.pcj.fluo.demo.DemoDriver</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/Demo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/Demo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/Demo.java new file mode 100644 index 0000000..f1ec3dd --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/Demo.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.demo; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.openrdf.repository.RepositoryConnection; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.mini.MiniFluo; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * Represents a demonstration that uses Rya and Fluo on top of Accumulo. + */ +public interface Demo { + + /** + * Run the demo. + */ + public void execute( + MiniAccumuloCluster accumulo, + Connector accumuloConn, + String ryaTablePrefix, + RyaSailRepository ryaRepo, + RepositoryConnection ryaConn, + MiniFluo fluo, + FluoClient fluoClient) throws DemoExecutionException; + + /** + * A {@link Demo}'s execution could not be completed because of a non-recoverable + * problem while running. + */ + public static class DemoExecutionException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link }. + * + * @param message - Describes why this is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public DemoExecutionException(final String message, final Exception cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java new file mode 100644 index 0000000..d985b06 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/DemoDriver.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.demo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.log4j.PatternLayout; +import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; +import org.apache.rya.indexing.pcj.fluo.demo.Demo.DemoExecutionException; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.google.common.io.Files; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.FluoFactory; +import io.fluo.api.config.FluoConfiguration; +import io.fluo.api.config.ObserverConfiguration; +import io.fluo.api.mini.MiniFluo; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * Runs {@link Demo}s that require Rya and Fluo. + */ +public class DemoDriver { + private static final Logger log = Logger.getLogger(DemoDriver.class); + + private static final String RYA_TABLE_PREFIX = "demo_"; + + // Rya data store and connections. + private static MiniAccumuloCluster accumulo = null; + private static Connector accumuloConn = null; + private static RyaSailRepository ryaRepo = null; + private static RepositoryConnection ryaConn = null; + + // Fluo data store and connections. + private static MiniFluo fluo = null; + private static FluoClient fluoClient = null; + + public static void main(final String[] args) { + setupLogging(); + + // Setup the resources required to run the demo. + try { + log.info("Initializing resources used by the demo..."); + setupResources(); + } catch (final DemoInitializationException e) { + log.error("Could not initialize the demo's resources. Exiting.", e); + System.exit(-1); + } + log.info(""); + + // Run the demo. + try { + new FluoAndHistoricPcjsDemo().execute(accumulo, accumuloConn, RYA_TABLE_PREFIX, ryaRepo, ryaConn, fluo, fluoClient); + } catch (final DemoExecutionException e) { + log.error("An exception was thrown durring demo execution. The demo can not continue.", e); + } + + // Tear down the demo environment. + log.info("Shutting down the demo..."); + shutdownResources(); + log.info("Demo exiting."); + } + + private static void setupLogging() { + // Turn off all the loggers and customize how they write to the console. + final Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.OFF); + final ConsoleAppender ca = (ConsoleAppender) rootLogger.getAppender("stdout"); + ca.setLayout(new PatternLayout("%-5p - %m%n")); + + + // Turn the loggers used by the demo back on. + log.setLevel(Level.INFO); + } + + /** + * Indicates a problem while initializing the demo's resources prevented it from starting. + */ + private static final class DemoInitializationException extends Exception { + private static final long serialVersionUID = 1L; + + public DemoInitializationException(final String message, final Exception cause) { + super(message, cause); + } + } + + private static void setupResources() throws DemoInitializationException { + try{ + // Initialize the Mini Accumulo that will be used to store Triples and get a connection to it. + log.debug("Starting up the Mini Accumulo Cluster used by Rya."); + accumulo = startMiniAccumulo(); + + // Setup the Rya library to use the Mini Accumulo. + log.debug("Starting up the Rya Repository."); + ryaRepo = setupRya(accumulo); + ryaConn = ryaRepo.getConnection(); + + // Initialize the Mini Fluo that will be used to store created queries. + log.debug("Starting up the Mini Fluo instance."); + fluo = startMiniFluo(); + fluoClient = FluoFactory.newClient( fluo.getClientConfiguration() ); + } catch(final Exception e) { + throw new DemoInitializationException("Could not run the demo because of a problem while initializing the mini resources.", e); + } + } + + private static void shutdownResources() { + if(ryaConn != null) { + try { + log.debug("Shutting down Rya Connection."); + ryaConn.close(); + } catch(final Exception e) { + log.error("Could not shut down the Rya Connection.", e); + } + } + + if(ryaRepo != null) { + try { + log.debug("Shutting down Rya Repo."); + ryaRepo.shutDown(); + } catch(final Exception e) { + log.error("Could not shut down the Rya Repo.", e); + } + } + + if(accumulo != null) { + try { + log.debug("Shutting down the Mini Accumulo being used as a Rya store."); + accumulo.stop(); + } catch(final Exception e) { + log.error("Could not shut down the Mini Accumulo.", e); + } + } + + if(fluoClient != null) { + try { + log.debug("Shutting down Fluo Client."); + fluoClient.close(); + } catch(final Exception e) { + log.error("Could not shut down the Fluo Client.", e); + } + } + + if(fluo != null) { + try { + log.debug("Shutting down Mini Fluo."); + fluo.close(); + } catch (final Exception e) { + log.error("Could not shut down the Mini Fluo.", e); + } + } + } + + /** + * Setup a Mini Accumulo cluster that uses a temporary directory to store its data. + * + * @return A Mini Accumulo cluster. + */ + private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { + final File miniDataDir = Files.createTempDir(); + + // Setup and start the Mini Accumulo. + final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(miniDataDir, "password"); + accumulo.start(); + + // Store a connector to the Mini Accumulo. + final Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers()); + accumuloConn = instance.getConnector("root", new PasswordToken("password")); + + return accumulo; + } + + /** + * Format a Mini Accumulo to be a Rya repository. + * + * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null) + * @return The Rya repository sitting on top of the Mini Accumulo. + */ + private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException { + checkNotNull(accumulo); + + // Setup the Rya Repository that will be used to create Repository Connections. + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); + crdfdao.setConnector(accumuloConn); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("demo_"); + conf.setDisplayQueryPlan(true); + + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX); + conf.set(ConfigUtils.CLOUDBASE_USER, "root"); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, "password"); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, accumulo.getInstanceName()); + + crdfdao.setConf(conf); + ryaStore.setRyaDAO(crdfdao); + + final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); + ryaRepo.initialize(); + + return ryaRepo; + } + + /** + * Setup a Mini Fluo cluster that uses a temporary directory to store its data.ll + * + * @return A Mini Fluo cluster. + */ + private static MiniFluo startMiniFluo() { + final File miniDataDir = Files.createTempDir(); + + // Setup the observers that will be used by the Fluo PCJ Application. + final List<ObserverConfiguration> observers = new ArrayList<>(); + observers.add(new ObserverConfiguration(TripleObserver.class.getName())); + observers.add(new ObserverConfiguration(StatementPatternObserver.class.getName())); + observers.add(new ObserverConfiguration(JoinObserver.class.getName())); + observers.add(new ObserverConfiguration(FilterObserver.class.getName())); + + // Provide export parameters child test classes may provide to the export observer. + final HashMap<String, String> exportParams = new HashMap<>(); + final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); + ryaParams.setExportToRya(true); + ryaParams.setAccumuloInstanceName(accumulo.getInstanceName()); + ryaParams.setZookeeperServers(accumulo.getZooKeepers()); + ryaParams.setExporterUsername("root"); + ryaParams.setExporterPassword("password"); + + final ObserverConfiguration exportObserverConfig = new ObserverConfiguration(QueryResultObserver.class.getName()); + exportObserverConfig.setParameters( exportParams ); + observers.add(exportObserverConfig); + + // Configure how the mini fluo will run. + final FluoConfiguration config = new FluoConfiguration(); + config.setApplicationName("IntegrationTests"); + config.setMiniDataDir(miniDataDir.getAbsolutePath()); + config.addObservers(observers); + + final MiniFluo miniFluo = FluoFactory.newMiniFluo(config); + return miniFluo; + } +} \ No newline at end of file
