RYA-55 Implemented a series of projects that use Fluo to incrementally update the results of a Rya Precomputed Join secondary index.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/15ec5d5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/15ec5d5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/15ec5d5f Branch: refs/heads/develop Commit: 15ec5d5faa670f9a65de52caa907eebdfa5ae757 Parents: 358c13b Author: Kevin Chilton <[email protected]> Authored: Fri Mar 18 19:13:57 2016 -0400 Committer: Kevin Chilton <[email protected]> Committed: Mon Mar 21 13:32:49 2016 -0400 ---------------------------------------------------------------------- extras/pom.xml | 1 + extras/rya.pcj.fluo/README.md | 42 ++ extras/rya.pcj.fluo/pcj.fluo.api/pom.xml | 48 ++ .../indexing/pcj/fluo/api/CountStatements.java | 64 +++ .../rya/indexing/pcj/fluo/api/CreatePcj.java | 249 ++++++++++ .../indexing/pcj/fluo/api/GetPcjMetadata.java | 132 +++++ .../indexing/pcj/fluo/api/GetQueryReport.java | 256 ++++++++++ .../indexing/pcj/fluo/api/InsertTriples.java | 102 ++++ .../rya/indexing/pcj/fluo/api/ListQueryIds.java | 76 +++ extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 90 ++++ .../indexing/pcj/fluo/app/BindingSetRow.java | 87 ++++ .../rya/indexing/pcj/fluo/app/FilterFinder.java | 83 ++++ .../pcj/fluo/app/FilterResultUpdater.java | 152 ++++++ .../pcj/fluo/app/FluoStringConverter.java | 299 ++++++++++++ .../rya/indexing/pcj/fluo/app/IncUpdateDAO.java | 232 +++++++++ .../fluo/app/IncrementalUpdateConstants.java | 37 ++ .../pcj/fluo/app/JoinResultUpdater.java | 314 ++++++++++++ .../rya/indexing/pcj/fluo/app/NodeType.java | 62 +++ .../pcj/fluo/app/QueryResultUpdater.java | 81 ++++ .../indexing/pcj/fluo/app/StringTypeLayer.java | 29 ++ .../app/export/IncrementalResultExporter.java | 70 +++ .../IncrementalResultExporterFactory.java | 103 ++++ .../pcj/fluo/app/export/ParametersBase.java | 59 +++ .../app/export/rya/RyaExportParameters.java | 133 +++++ .../fluo/app/export/rya/RyaResultExporter.java | 73 +++ .../export/rya/RyaResultExporterFactory.java | 69 +++ .../fluo/app/observers/BindingSetUpdater.java | 155 ++++++ .../pcj/fluo/app/observers/FilterObserver.java | 65 +++ .../pcj/fluo/app/observers/JoinObserver.java | 64 +++ .../fluo/app/observers/QueryResultObserver.java | 111 +++++ .../app/observers/StatementPatternObserver.java | 65 +++ .../pcj/fluo/app/observers/TripleObserver.java | 135 ++++++ .../pcj/fluo/app/query/CommonNodeMetadata.java | 102 ++++ .../pcj/fluo/app/query/FilterMetadata.java | 257 ++++++++++ .../indexing/pcj/fluo/app/query/FluoQuery.java | 318 ++++++++++++ .../pcj/fluo/app/query/FluoQueryColumns.java | 142 ++++++ .../fluo/app/query/FluoQueryMetadataDAO.java | 358 ++++++++++++++ .../pcj/fluo/app/query/JoinMetadata.java | 224 +++++++++ .../pcj/fluo/app/query/QueryMetadata.java | 186 +++++++ .../fluo/app/query/SparqlFluoQueryBuilder.java | 479 +++++++++++++++++++ .../app/query/StatementPatternMetadata.java | 197 ++++++++ .../indexing/pcj/fluo/app/FilterFinderTest.java | 84 ++++ .../pcj/fluo/app/FluoStringConverterTest.java | 203 ++++++++ .../rya/indexing/pcj/fluo/app/NodeTypeTest.java | 64 +++ .../app/export/rya/RyaExportParametersTest.java | 65 +++ .../pcj.fluo.client/conf/log4j2.xml | 36 ++ .../pcj.fluo.client/conf/tool.properties | 45 ++ extras/rya.pcj.fluo/pcj.fluo.client/pom.xml | 136 ++++++ .../pcj/fluo/client/PcjAdminClient.java | 247 ++++++++++ .../pcj/fluo/client/PcjAdminClientCommand.java | 97 ++++ .../fluo/client/PcjAdminClientProperties.java | 101 ++++ .../CountUnprocessedStatementsCommand.java | 69 +++ .../fluo/client/command/ListQueriesCommand.java | 127 +++++ .../fluo/client/command/LoadTriplesCommand.java | 140 ++++++ .../fluo/client/command/NewQueryCommand.java | 124 +++++ .../fluo/client/command/QueryReportCommand.java | 104 ++++ .../pcj/fluo/client/util/FluoLoader.java | 90 ++++ .../fluo/client/util/ParsedQueryRequest.java | 134 ++++++ .../fluo/client/util/PcjMetadataRenderer.java | 100 ++++ .../fluo/client/util/QueryReportRenderer.java | 112 +++++ .../indexing/pcj/fluo/client/util/Report.java | 214 +++++++++ .../pcj/fluo/client/ParsedQueryRequestTest.java | 83 ++++ .../fluo/client/PcjMetadataRendererTest.java | 125 +++++ .../indexing/pcj/fluo/client/ReportTests.java | 82 ++++ extras/rya.pcj.fluo/pcj.fluo.demo/pom.xml | 85 ++++ .../apache/rya/indexing/pcj/fluo/demo/Demo.java | 63 +++ .../rya/indexing/pcj/fluo/demo/DemoDriver.java | 289 +++++++++++ .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 371 ++++++++++++++ .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 64 +++ .../apache/rya/indexing/pcj/fluo/ITBase.java | 374 +++++++++++++++ .../pcj/fluo/api/CountStatementsIT.java | 86 ++++ .../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 101 ++++ .../indexing/pcj/fluo/api/GetQueryReportIT.java | 120 +++++ .../indexing/pcj/fluo/api/ListQueryIdsIT.java | 66 +++ .../fluo/app/query/FluoQueryMetadataDAOIT.java | 183 +++++++ .../indexing/pcj/fluo/integration/InputIT.java | 254 ++++++++++ .../indexing/pcj/fluo/integration/QueryIT.java | 234 +++++++++ .../pcj/fluo/integration/RyaExportIT.java | 187 ++++++++ extras/rya.pcj.fluo/pom.xml | 130 +++++ 79 files changed, 10990 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/pom.xml ---------------------------------------------------------------------- diff --git a/extras/pom.xml b/extras/pom.xml index a3199bf..d3d51a1 100644 --- a/extras/pom.xml +++ b/extras/pom.xml @@ -40,5 +40,6 @@ under the License. <module>indexing</module> <module>indexingExample</module> <module>vagrantExample</module> + <module>rya.pcj.fluo</module> </modules> </project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/README.md ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/README.md b/extras/rya.pcj.fluo/README.md new file mode 100644 index 0000000..70361c1 --- /dev/null +++ b/extras/rya.pcj.fluo/README.md @@ -0,0 +1,42 @@ +<!-- Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. --> + +Rya Incrementally Updating Precomputed Joins +============================================ +This project is an implementation of the Rya Precomputed Join (PCJ) indexing +feature that runs on top of [Fluo][1] so that it may incrementally update the +results of a query as new semantic triples are added to storage. + +This project contains the following modules: + * **rya.pcj.fluo.app** - A Fluo application that incrementally updates the results + of a Precomputed Join Secondary Index. This app runs as a YARN application on a + cluster, receives streams of new RDF Statements, determines if those statements + create any new index values, and then exports those values to the appropriate Rya + PCJ Tables. + * **rya.pcj.fluo.api** - Defines calls that may be made to the Rya PCJ Fluo App + while it is running. These calls are intended to be used by client applications + such as debug tools, data ingest tools, administrative tools, etc. + * **rya.pcj.fluo.client** - A command line client that lets an administrative user + interact with the running Rya PCJ Flup App that is running on their cluster. + * **rya.pcj.fluo.demo** - A demo application that shows how the Rya PCJ Fluo App + may be used to incrementally update PCJ results within a Rya instance. The demo + uses MiniAccumuloCluster and MiniFluo so that it is entirely self contained. + * **integration** - Contains integration tests that use a MiniAccumuloCluster + and MiniFluo to ensure the Rya PCJ Fluo App work within an emulation of the + production environment. + +[1]: http://fluo.io/ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml new file mode 100644 index 0000000..292121d --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.parent</artifactId> + <version>3.2.10-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.pcj.fluo.api</artifactId> + + <name>Apache Rya PCJ Fluo API</name> + <description> + This module contains the Rya PCJ Fluo API. It consists of classes + that allow other applications to interact with the Rya PCJ Fluo + application while it is running on a cluster. + </description> + + <dependencies> + <!-- Rya Runtime Dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.app</artifactId> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CountStatements.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CountStatements.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CountStatements.java new file mode 100644 index 0000000..326f807 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CountStatements.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.math.BigInteger; + +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.Snapshot; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.iterator.RowIterator; + +/** + * Counts the number of RDF Statements that have been loaded into the Fluo app + * that have not been processed yet. + */ +public class CountStatements { + + /** + * Get the number of RDF Statements that have been loaded into the Fluo app + * that have not been processed yet. + * + * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null) + * @return The number of RDF Statements that have been loaded into the Fluo + * app that have not been processed yet. + */ + public BigInteger countStatements(final FluoClient fluo) { + checkNotNull(fluo); + + try(Snapshot sx = fluo.newSnapshot()) { + // Limit the scan to the Triples binding set column. + final ScannerConfiguration scanConfig = new ScannerConfiguration(); + scanConfig.fetchColumn(FluoQueryColumns.TRIPLES.getFamily(), FluoQueryColumns.TRIPLES.getQualifier()); + + final RowIterator rows = sx.get(scanConfig); + BigInteger count = BigInteger.valueOf(0L); + while(rows.hasNext()) { + rows.next(); + count = count.add( BigInteger.ONE ); + } + + return count; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java new file mode 100644 index 0000000..b99d293 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import java.util.HashSet; +import java.util.Set; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder; +import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; + +import info.aduna.iteration.CloseableIteration; +import io.fluo.api.client.FluoClient; +import io.fluo.api.types.TypedTransaction; +import mvm.rya.indexing.external.tupleSet.PcjTables; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjTableNameFactory; +import mvm.rya.indexing.external.tupleSet.PcjTables.ShiftVarOrderFactory; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. + * <p> + * This is a two phase process. + * <ol> + * <li>Setup metadata about each node of the query using a single Fluo transaction. </li> + * <li>Scan Rya for binding sets that match each Statement Pattern from the query + * and use a separate Fluo transaction for each batch that is inserted. This + * ensure historic triples will be included in the query's results.</li> + * </ol> + * After the first step is finished, any new Triples that are added to the Fluo + * application will be matched against statement patterns, the final results + * will percolate to the top of the query, and those results will be exported to + * Rya's query system. + */ +@ParametersAreNonnullByDefault +public class CreatePcj { + + /** + * Wraps Fluo {@link Transaction}s so that we can write String values to them. + */ + private static final StringTypeLayer STRING_TYPED_LAYER = new StringTypeLayer(); + + /** + * The default Statement Pattern batch insert size is 1000. + */ + private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000; + + /** + * A utility used to interact with Rya's PCJ tables. + */ + private static final PcjTables PCJ_TABLES = new PcjTables(); + + /** + * The maximum number of binding sets that will be inserted into each Statement + * Pattern's result set per Fluo transaction. + */ + private final int spInsertBatchSize; + + /** + * Constructs an instance of {@link CreatePcj} that uses + * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size. + */ + public CreatePcj() { + this(DEFAULT_SP_INSERT_BATCH_SIZE); + } + + /** + * Constructs an instance of {@link CreatePcj}. + * + * @param spInsertBatchSize - The maximum number of binding sets that will be + * inserted into each Statement Pattern's result set per Fluo transaction. + */ + public CreatePcj(final int spInsertBatchSize) { + checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0."); + this.spInsertBatchSize = spInsertBatchSize; + } + + /** + * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. Historic + * triples will be scanned and matched using the rya connection that was + * provided. The PCJ will also automatically export to a table in Accumulo + * named using the {@code ryaTablePrefix} and the query's ID from the Fluo table. + * + * @param fluo - A connection to the Fluo table that will be updated. (not null) + * @param ryaTablePrefix - The prefix that will be prepended to the Accumulo table + * the PCJ's results will be exported to. (not null) + * @param rya - A connection to the Rya repository that will be scanned. (not null) + * @param accumuloConn - A connectino to the Accumulo instance the incremental + * results will be exported to as a Rya PCJ table. (not null) + * @param varOrders - The variable orders the query's results will be exported to + * within the export table. If this set is empty, then a default will be + * used instead.(not null) + * @param sparql - The SPARQL query whose results will be incrementally updated by Fluo. (not null) + * @throws MalformedQueryException The PCJ could not be initialized because the SPARQL query was malformed. + * @throws PcjException The PCJ could not be initialized because of a problem setting up the export location. + * @throws SailException Historic results could not be added to the initialized PCJ because of + * a problem with the Rya connection. + * @throws QueryEvaluationException Historic results could not be added to the initialized PCJ because of + * a problem with the Rya connection. + */ + public void withRyaIntegration( + final FluoClient fluo, + final String ryaTablePrefix, + final RyaSailRepository rya, + final Connector accumuloConn, + final Set<VariableOrder> varOrders, + final String sparql) throws MalformedQueryException, PcjException, SailException, QueryEvaluationException { + checkNotNull(fluo); + checkNotNull(ryaTablePrefix); + checkNotNull(rya); + checkNotNull(accumuloConn); + checkNotNull(varOrders); + checkNotNull(sparql); + + // Parse the SPARQL into a POJO. + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); + + // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo. + // We use these IDs later when scanning Rya for historic Statement Pattern matches + // as well as setting up automatic exports. + final NodeIds nodeIds = new NodeIds(); + final String exportTableName; + final String queryId; + + // Parse the query's structure for the metadata that will be written to fluo. + final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); + + try(TypedTransaction tx = STRING_TYPED_LAYER.wrap( fluo.newTransaction() )) { + // Write the query's structure to Fluo. + new FluoQueryMetadataDAO().write(tx, fluoQuery); + + // Since we are exporting the query's results to a table in Accumulo, store that location in the fluo table. + queryId = fluoQuery.getQueryMetadata().getNodeId(); + + exportTableName = new PcjTableNameFactory().makeTableName(ryaTablePrefix, queryId); + tx.mutate().row(queryId).col(FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).set(exportTableName); + + // Flush the changes to Fluo. + tx.commit(); + } + + // Initialize the export destination in Accumulo. If triples are being written to Fluo + // while this query is being created, then the export observer may throw errors for a while + // until this step is completed. + final VariableOrder queryVarOrder = fluoQuery.getQueryMetadata().getVariableOrder(); + if(varOrders.isEmpty()) { + final Set<VariableOrder> shiftVarOrders = new ShiftVarOrderFactory().makeVarOrders( queryVarOrder ); + varOrders.addAll(shiftVarOrders); + } + PCJ_TABLES.createPcjTable(accumuloConn, exportTableName, varOrders, sparql); + + // Get a connection to Rya. It's used to scan for Statement Pattern results. + final SailConnection ryaConn = rya.getSail().getConnection(); + + // Reuse the same set object while performing batch inserts. + final Set<BindingSet> batch = new HashSet<>(); + + // Iterate through each of the statement patterns and insert their historic matches into Fluo. + for(final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) { + // Get an iterator over all of the binding sets that match the statement pattern. + final StatementPattern pattern = FluoStringConverter.toStatementPattern( patternMetadata.getStatementPattern() ); + final CloseableIteration<? extends BindingSet, QueryEvaluationException> bindingSets = ryaConn.evaluate(pattern, null, null, false); + + // Insert batches of the binding sets into Fluo. + while(bindingSets.hasNext()) { + if(batch.size() == spInsertBatchSize) { + writeBatch(fluo, patternMetadata, batch); + batch.clear(); + } + + batch.add( bindingSets.next() ); + } + + if(!batch.isEmpty()) { + writeBatch(fluo, patternMetadata, batch); + batch.clear(); + } + } + } + + /** + * Writes a batch of {@link BindingSet}s that match a statement pattern to Fluo. + * + * @param fluo - Creates transactions to Fluo. (not null) + * @param spMetadata - The Statement Pattern the batch matches. (not null) + * @param batch - A set of binding sets that are the result of the statement pattern. (not null) + */ + private static void writeBatch(final FluoClient fluo, final StatementPatternMetadata spMetadata, final Set<BindingSet> batch) { + checkNotNull(fluo); + checkNotNull(spMetadata); + checkNotNull(batch); + + try(TypedTransaction tx = STRING_TYPED_LAYER.wrap(fluo.newTransaction())) { + // Get the node's variable order. + final String spNodeId = spMetadata.getNodeId(); + final String[] varOrder = spMetadata.getVariableOrder().toArray(); + + for(final BindingSet bindingSet : batch) { + final String bindingSetStr = FluoStringConverter.toBindingSetString(bindingSet, varOrder); + + // Write the binding set entry to Fluo for the statement pattern. + tx.mutate().row(spNodeId + NODEID_BS_DELIM + bindingSetStr) + .col(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET) + .set(bindingSetStr); + } + + tx.commit(); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java new file mode 100644 index 0000000..88c7930 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.data.Bytes; +import io.fluo.api.types.TypedSnapshot; +import mvm.rya.indexing.external.tupleSet.PcjTables; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; + +/** + * Get {@link PcjMetadata} for queries that are managed by the Fluo app. + */ +public class GetPcjMetadata { + + private final ListQueryIds listQueryIds = new ListQueryIds(); + + /** + * Get the {@link PcjMetadata} of all queries that are being maintained by + * the Fluo app. + * + * @param accumulo - The Accumulo instance that will be searched. (not null) + * @param fluo - The Fluo instance that will be searched. (not null) + * @return A map where the query ID is the key and its metadata is the value. + * @throws NotInFluoException A query Id does not have a PCJ export able + * associated with it in the Fluo table. + * @throws NotInAccumuloException A PCJ export table that was found either + * does not exist in Accumulo or it is not a PCJ table. + */ + public Map<String, PcjMetadata> getMetadata(final Connector accumulo, final FluoClient fluo) throws NotInFluoException, NotInAccumuloException { + checkNotNull(accumulo); + checkNotNull(fluo); + + final Map<String, PcjMetadata> metadata = new HashMap<>(); + + final Collection<String> queryIds = listQueryIds.listQueryIds(fluo); + for(final String queryId : queryIds) { + metadata.put(queryId, getMetadata(accumulo, fluo, queryId)); + } + + return metadata; + } + + /** + * Get the {@link PcjMetadata} of a query that is being maintained by the + * Fluo app. + * + * @param accumulo - The Accumulo instance that will be searched. (not null) + * @param fluo - The Fluo instance that will be searched. (not null) + * @param queryId - The Query Id whose metadata will be fetched. (not null) + * @return The {@link PcjMetadata} of the query. + * @throws NotInFluoException The query Id does not have a PCJ export able + * associated with it in the Fluo table. + * @throws NotInAccumuloException The PCJ export table that was found either + * does not exist in Accumulo or it is not a PCJ table. + */ + public PcjMetadata getMetadata(final Connector accumulo, final FluoClient fluo, final String queryId) throws NotInFluoException, NotInAccumuloException { + checkNotNull(accumulo); + checkNotNull(fluo); + checkNotNull(queryId); + + // Lookup the Accumulo export table name in the Fluo table. + String pcjTableName = null; + try(TypedSnapshot snap = new StringTypeLayer().wrap( fluo.newSnapshot() ) ) { + final Bytes pcjTableNameBytes = snap.get(Bytes.of(queryId), FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME); + if(pcjTableNameBytes == null) { + throw new NotInFluoException("Could not get the PcjMetadata for queryId '" + queryId + + "' because a PCJ export table name was not stored in the Fluo table."); + } + pcjTableName = pcjTableNameBytes.toString(); + } + + // Fetch the metadata from the Accumulo table. + try { + return new PcjTables().getPcjMetadata(accumulo, pcjTableName); + } catch (final PcjException e) { + throw new NotInAccumuloException("Could not get the PcjMetadata for queryId '" + queryId + + "' because the metadata was missing from the Accumulo table.", e); + } + } + + /** + * Indicates PCJ Metadata could not be fetched for a query ID because the + * Accumulo export table name was not stored in the Fluo table. + */ + public static final class NotInFluoException extends Exception { + private static final long serialVersionUID = 1L; + + public NotInFluoException(final String message) { + super(message); + } + } + + /** + * Indicates PCJ Metadata could not be fetched for a query ID because the + * metadata was missing in Accumulo. + */ + public static final class NotInAccumuloException extends Exception { + private static final long serialVersionUID = 1L; + + public NotInAccumuloException(final String message, final Exception cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java new file mode 100644 index 0000000..2db7f3d --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.math.BigInteger; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.annotation.Nullable; +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; + +import com.google.common.collect.ImmutableMap; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.Snapshot; +import io.fluo.api.client.SnapshotBase; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Column; +import io.fluo.api.data.Span; +import io.fluo.api.iterator.RowIterator; + +/** + * Get a reports that indicates how many binding sets have been emitted for + * the queries that is being managed by the fluo application. + */ +@ParametersAreNonnullByDefault +public class GetQueryReport { + + private final FluoQueryMetadataDAO metadataDao = new FluoQueryMetadataDAO(); + + /** + * Get a report that indicates how many binding sets have been emitted for + * every query that is being managed by the fluo application. + * + * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null) + * @return A map from Query ID to QueryReport that holds a report for all of + * the queries that are being managed within the fluo app. + */ + public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) { + checkNotNull(fluo); + + // Fetch the queries that are being managed by the Fluo. + final List<String> queryIds = new ListQueryIds().listQueryIds(fluo); + + final Map<String, QueryReport> reports = new HashMap<>(); + for(final String queryId : queryIds) { + final QueryReport report = getReport(fluo, queryId); + reports.put(queryId, report); + } + return reports; + } + + /** + * Get a report that indicates how many biniding sets have been emitted for + * a query that is being managed by the fluo application. + * + * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null) + * @param queryId - The ID of the query to fetch. (not null) + * @return A report that was built for the query. + */ + public QueryReport getReport(final FluoClient fluo, final String queryId) { + checkNotNull(fluo); + checkNotNull(queryId); + + final QueryReport.Builder reportBuilder = QueryReport.builder(); + + try(Snapshot sx = fluo.newSnapshot()) { + final FluoQuery fluoQuery = metadataDao.readFluoQuery(sx, queryId); + reportBuilder.setFluoQuery(fluoQuery); + + // Query results. + BigInteger count = countBindingSets(sx, queryId, FluoQueryColumns.QUERY_BINDING_SET); + reportBuilder.setCount(queryId, count); + + // Filter results. + for(final FilterMetadata filter : fluoQuery.getFilterMetadata()) { + final String filterId = filter.getNodeId(); + count = countBindingSets(sx, filterId, FluoQueryColumns.FILTER_BINDING_SET); + reportBuilder.setCount(filterId, count); + } + + // Join results. + for(final JoinMetadata join : fluoQuery.getJoinMetadata()) { + final String joinId = join.getNodeId(); + count = countBindingSets(sx, joinId, FluoQueryColumns.JOIN_BINDING_SET); + reportBuilder.setCount(joinId, count); + } + + // Statement Pattern results. + for(final StatementPatternMetadata statementPattern : fluoQuery.getStatementPatternMetadata()) { + final String patternId = statementPattern.getNodeId(); + count = countBindingSets(sx, patternId, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET); + reportBuilder.setCount(patternId, count); + } + } + + return reportBuilder.build(); + } + + private BigInteger countBindingSets(final SnapshotBase sx, final String nodeId, final Column bindingSetColumn) { + checkNotNull(sx); + checkNotNull(nodeId); + checkNotNull(bindingSetColumn); + + // Limit the scan to the binding set column and node id. + final ScannerConfiguration scanConfig = new ScannerConfiguration(); + scanConfig.fetchColumn(bindingSetColumn.getFamily(), bindingSetColumn.getQualifier()); + scanConfig.setSpan( Span.prefix(nodeId) ); + + final RowIterator rows = sx.get(scanConfig); + BigInteger count = BigInteger.valueOf(0L); + while(rows.hasNext()) { + rows.next(); + count = count.add( BigInteger.ONE ); + } + + return count; + } + + /** + * Contains all metadata that represents a SPARQL query within the Fluo app + * as well as the number of Binding Sets that have been emitted for each of + * the query nodes. + */ + @Immutable + @ParametersAreNonnullByDefault + public static final class QueryReport { + + /** + * Metadata about the nodes of the query. + */ + private final FluoQuery fluoQuery; + + /** + * The number of binding sets that match each of the nodes. + * <p> + * The key is the Node ID of a node in {@code fluoQuery}. <br/> + * The value is the number of Binding Sets that have been emitted for the node. + */ + private final ImmutableMap<String, BigInteger> counts; + + /** + * Constructs an instance of {@link QueryReport}. Use the {@link Builder} instead. + * + * @param fluoQuery - Metadata about the nodes of the query. (not null) + * @param counts - A map from Node ID to the number of binding sets that + * have been emitted for that Node ID in the fluo app. (not null) + */ + private QueryReport( + final FluoQuery fluoQuery, + final ImmutableMap<String, BigInteger> counts) { + this.fluoQuery = checkNotNull(fluoQuery); + this.counts = checkNotNull(counts); + } + + /** + * @return Metadata about the nodes of the query. + */ + public FluoQuery getFluoQuery() { + return fluoQuery; + } + + /** + * Get the number of Binding Sets that have been emitted for a node. + * + * @param nodeId - The Node ID of the node that emits binding sets. (not null) + * @return The number of Binding Sets that have been emitted for the node. + */ + public BigInteger getCount(final String nodeId) { + checkNotNull(nodeId); + return counts.get(nodeId); + } + + /** + * @return A map from Node ID to the number of binding sets that + * have been emitted for that Node ID in the fluo app. + */ + public ImmutableMap<String, BigInteger> getCounts() { + return counts; + } + + /** + * @return An empty instance of {@link Builder}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Builds instances of {@link QueryReport}. + */ + @ParametersAreNonnullByDefault + public static final class Builder { + + private FluoQuery fluoQuery = null; + private final ImmutableMap.Builder<String, BigInteger> counts = ImmutableMap.builder(); + + /** + * Set the metadata about the nodes of the query. + * + * @param fluoQuery - The metadata about the nodes of the query. + * @return This builder so that method invocations may be chained. + */ + public Builder setFluoQuery(@Nullable final FluoQuery fluoQuery) { + this.fluoQuery = fluoQuery; + return this; + } + + /** + * Set the number of Binding Sets that have been emitted for a node. + * + * @param nodeId - The ID of the node. + * @param count - the number of binding sets that have been emitted. + * @return This builder so that method invocations may be chained. + */ + public Builder setCount(@Nullable final String nodeId, @Nullable final BigInteger count) { + counts.put(nodeId, count); + return this; + } + + /** + * @return An instance of {@link QueryReport} built using this builder's values. + */ + public QueryReport build() { + return new QueryReport(fluoQuery, counts.build()); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java new file mode 100644 index 0000000..02e871f --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.types.TypedTransaction; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver; + +/** + * Insert a batch of Triples into. This will trigger observers that will update + * the final results of any PCJs that are being managed by this application. + */ +public class InsertTriples { + private static final Logger log = Logger.getLogger(InsertTriples.class); + + /** + * Wraps Fluo {@link Transaction}s so that we can write String values to them. + */ + private static final StringTypeLayer STRING_TYPED_LAYER = new StringTypeLayer(); + + /** + * Converts triples into the byte[] used as the row ID in Accumulo. + */ + private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver(); + + /** + * Inserts a triple into Fluo. + * + * @param fluo - A connection to the Fluo table that will be updated. (not null) + * @param triple - The triple to insert. (not null) + */ + public void insert(final FluoClient fluo, final RyaStatement triple) { + insert(fluo, Collections.singleton(triple)); + } + + /** + * Insert a batch of triples into Fluo. + * + * @param fluo - A connection to the Fluo table that will be updated. (not null) + * @param triples - The triples to insert. (not null) + */ + public void insert(final FluoClient fluo, final Collection<RyaStatement> triples) { + checkNotNull(fluo); + checkNotNull(triples); + + try(TypedTransaction tx = STRING_TYPED_LAYER.wrap(fluo.newTransaction())) { + for(final RyaStatement triple : triples) { + try { + tx.mutate().row(spoFormat(triple)).col(FluoQueryColumns.TRIPLES).set(); + } catch (final TripleRowResolverException e) { + log.error("Could not convert a Triple into the SPO format: " + triple); + } + } + + tx.commit(); + } + } + + /** + * Converts a triple into a byte[] holding the Rya SPO representation of it. + * + * @param triple - The triple to convert. (not null) + * @return The Rya SPO representation of the triple. + * @throws TripleRowResolverException The triple could not be converted. + */ + public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException { + checkNotNull(triple); + final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple); + final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO); + return spoRow.getRow(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java new file mode 100644 index 0000000..a85bf56 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Bytes; +import io.fluo.api.iterator.ColumnIterator; +import io.fluo.api.iterator.RowIterator; +import io.fluo.api.types.TypedSnapshot; + +/** + * Finds all queries that are being managed by this instance of Fluo that + * are also being exported to the provided instance of Accumulo. + */ +public class ListQueryIds { + + /** + * Finds all queries that are being managed by this instance of Fluo that + * are also being exported to the provided instance of Accumulo. + * + * @param fluo - The Fluo instance that will be searched. (not null) + * @return An ascending alphabetically sorted list of the Query IDs being + * managed by the Fluo app and exported to an instance of Accumulo. + */ + public List<String> listQueryIds(final FluoClient fluo) { + checkNotNull(fluo); + + final List<String> queryIds = new ArrayList<>(); + + try(TypedSnapshot snap = new StringTypeLayer().wrap( fluo.newSnapshot() )) { + // Create an iterator that iterates over the QUERY_ID column. + final ScannerConfiguration scanConfig = new ScannerConfiguration(); + scanConfig.fetchColumn(FluoQueryColumns.QUERY_ID.getFamily(), FluoQueryColumns.QUERY_ID.getQualifier()); + final RowIterator rows = snap.get(scanConfig); + + // Fetch the Query IDs that is stored in the Fluo table. + while(rows.hasNext()) { + final Entry<Bytes, ColumnIterator> entry = rows.next(); + final Bytes sparql = entry.getKey(); + final String queryId = snap.get(sparql, FluoQueryColumns.QUERY_ID).toString(); + queryIds.add(queryId); + } + } + + // Sort them alphabetically. + Collections.sort(queryIds); + return queryIds; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml new file mode 100644 index 0000000..b591e07 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.parent</artifactId> + <version>3.2.10-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.pcj.fluo.app</artifactId> + + <name>Apache Rya PCJ Fluo App</name> + <description> + A Fluo implementation of Rya Precomputed Join Indexing. This module produces + a jar that may be executed by the 'fluo' command line tool as a YARN job. + </description> + + <dependencies> + <!-- Rya Runtime Dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + <exclusions> + <exclusion> + <artifactId>slf4j-log4j12</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + </exclusions> + </dependency> + + <!-- 3rd Party Runtime Dependencies. --> + <dependency> + <groupId>io.fluo</groupId> + <artifactId>fluo-api</artifactId> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Use the pre-build 'jar-with-dependencies' assembly to package the dependent class files into the final jar. + This creates a jar file that can be deployed to Fluo without having to include any dependent jars. --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java new file mode 100644 index 0000000..859cd4b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/BindingSetRow.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +import io.fluo.api.data.Bytes; + +/** + * The values of an Accumulo Row ID for a row that stores a Binding set for + * a specific Node ID of a query. + */ +@Immutable +@ParametersAreNonnullByDefault +public class BindingSetRow { + private final String nodeId; + private final String[] bindingStrings; + + /** + * Constructs an instance of {@link BindingSetRow}. + * + * @param nodeId - The Node ID of a query node. (not null) + * @param bindingStrings - A Binding Set that is part of the node's results. (not null) + */ + public BindingSetRow(final String nodeId, final String[] bindingStrings) { + this.nodeId = checkNotNull(nodeId); + this.bindingStrings = checkNotNull(bindingStrings); + } + + /** + * @return The Node ID of a query node. + */ + public String getNodeId() { + return nodeId; + } + + /** + * @return A Binding Set that is part of the node's results. It is formatted + * in SPO order and each String requires further interpretation. + */ + public String[] getBindingStrings() { + return bindingStrings; + } + + /** + * Parses the {@link Bytes} of an Accumulo Row ID into a {@link BindingSetRow}. + * + * @param row - The Row ID to parse. (not null). + * @return A {@link BindingSetRow} holding the parsed values. + */ + public static BindingSetRow make(final Bytes row) { + checkNotNull(row); + + // Read the Node ID from the row's bytes. + final String[] rowArray = row.toString().split(NODEID_BS_DELIM); + if(rowArray.length != 2) { + throw new IllegalArgumentException("A row must contain a single NODEID_BS_DELIM."); + } + final String nodeId = rowArray[0]; + + // Read the row's Binding Set from the bytes. + final String[] bindingStrings = rowArray[1].split(DELIM); + + return new BindingSetRow(nodeId, bindingStrings); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java new file mode 100644 index 0000000..3c9b875 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinder.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.concurrent.atomic.AtomicReference; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Optional; + +/** + * Searches a SPARQL query for {@link Filter}s. + */ +@ParametersAreNonnullByDefault +class FilterFinder { + + /** + * Search a SPARQL query for the {@link Filter} that appears at the + * {@code indexWithinQuery}'th time within the query. + * <p> + * The top most filter within the query will be at index 0, the next filter + * encountered will be at index 1, ... and the last index that is encountered + * will be at index <i>n</i>. + * + * @param sparql - The SPARQL query that to parse. (not null) + * @param indexWithinQuery - The index of the filter to fetch. (not null) + * @return The filter that was found within the query at the specified index; + * otherwise absent. + * @throws Exception Thrown when the query could not be parsed or iterated over. + */ + public Optional<Filter> findFilter(final String sparql, final int indexWithinQuery) throws Exception { + checkNotNull(sparql); + checkArgument(indexWithinQuery >= 0); + + // When a filter is encountered for the requested index, store it in atomic reference and quit searching. + final AtomicReference<Filter> filterRef = new AtomicReference<>(); + final QueryModelVisitorBase<RuntimeException> filterFinder = new QueryModelVisitorBase<RuntimeException>() { + private int i = 0; + @Override + public void meet(final Filter filter) { + // Store and stop searching. + if(i == indexWithinQuery) { + filterRef.set(filter); + return; + } + + // Continue to the next filter. + i++; + super.meet(filter); + } + }; + + // Parse the query and find the filter. + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); + parsedQuery.getTupleExpr().visit(filterFinder); + return Optional.fromNullable(filterRef.get()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java new file mode 100644 index 0000000..5ff5acc --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.evaluation.TripleSource; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; +import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.base.Optional; + +import info.aduna.iteration.CloseableIteration; +import io.fluo.api.client.TransactionBase; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Column; +import io.fluo.api.types.Encoder; +import io.fluo.api.types.StringEncoder; +import mvm.rya.indexing.external.tupleSet.BindingSetStringConverter; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +/** + * Updates the results of a Filter node when its child has added a new Binding + * Set to its results. + */ +@ParametersAreNonnullByDefault +public class FilterResultUpdater { + + private final Encoder encoder = new StringEncoder(); + + /** + * A utility class used to search SPARQL queries for Filters. + */ + private static final FilterFinder filterFinder = new FilterFinder(); + + /** + * Is used to evaluate the conditions of a {@link Filter}. + */ + private static final EvaluationStrategyImpl evaluator = new EvaluationStrategyImpl( + new TripleSource() { + private final ValueFactory valueFactory = new ValueFactoryImpl(); + + @Override + public ValueFactory getValueFactory() { + return valueFactory; + } + + @Override + public CloseableIteration<? extends Statement, QueryEvaluationException> getStatements( + final Resource arg0, + final URI arg1, + final Value arg2, + final Resource... arg3) throws QueryEvaluationException { + throw new UnsupportedOperationException(); + } + }); + + /** + * Updates the results of a Filter node when one of its child has added a + * new Binding Set to its results. + * + * @param tx - The transaction all Fluo queries will use. (not null) + * @param childBindingSet - A binding set that the query's child node has emmitted. (not null) + * @param filterMetadata - The metadata of the Filter whose results will be updated. (not null) + * @throws Exception Something caused the update to fail. + */ + public void updateFilterResults( + final TransactionBase tx, + final BindingSet childBindingSet, + final FilterMetadata filterMetadata) throws Exception { + checkNotNull(tx); + checkNotNull(childBindingSet); + checkNotNull(filterMetadata); + + // Parse the original query and find the Filter that represents filterId. + final String sparql = filterMetadata.getOriginalSparql(); + final int indexWithinQuery = filterMetadata.getFilterIndexWithinSparql(); + final Optional<Filter> filter = filterFinder.findFilter(sparql, indexWithinQuery); + + // Evaluate whether the child BindingSet satisfies the filter's condition. + final ValueExpr condition = filter.get().getCondition(); + if (isTrue(condition, childBindingSet)) { + // Create the Filter's binding set from the child's. + final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); + + final MapBindingSet filterBindingSet = new MapBindingSet(); + for(final String bindingName : filterVarOrder) { + final Binding binding = childBindingSet.getBinding(bindingName); + filterBindingSet.addBinding(binding); + } + final String filterBindingSetString = BindingSetStringConverter.toString(filterBindingSet, filterVarOrder); + + final Bytes row = encoder.encode( filterMetadata.getNodeId() + NODEID_BS_DELIM + filterBindingSetString ); + final Column col = FluoQueryColumns.FILTER_BINDING_SET; + final Bytes value = encoder.encode(filterBindingSetString); + tx.set(row, col, value); + } + } + + /** + * Evaluate a {@link BindingSet} to see if it is accepted by a filter's condition. + * + * @param condition - The filter condition. (not null) + * @param bindings - The binding set to evaluate. (not null) + * @return {@code true} if the binding set is accepted by the filter; otherwise {@code false}. + * @throws QueryEvaluationException The condition couldn't be evaluated. + */ + private static boolean isTrue(final ValueExpr condition, final BindingSet bindings) throws QueryEvaluationException { + try { + final Value value = evaluator.evaluate(condition, bindings); + return QueryEvaluationUtil.getEffectiveBooleanValue(value); + } catch (final ValueExprEvaluationException e) { + // XXX Hack: If filtering a statement that does not have the right bindings, return true. + // When would this ever come up? Should we actually return true? + return true; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/15ec5d5f/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java new file mode 100644 index 0000000..61e3d5f --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE; +import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.VAR_DELIM; + +import java.util.Collection; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.model.Literal; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.Binding; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.StatementPattern; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Joiner; + +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.resolver.RdfToRyaConversions; + +/** + * Contains method that convert between the Sesame representations of RDF + * components and the Strings that are used by the Fluo PCJ application. + */ +@ParametersAreNonnullByDefault +public class FluoStringConverter { + + private static final ValueFactory valueFactory = new ValueFactoryImpl(); + + /** + * Converts an ordered collection of variables into the Variable Order + * String that is stored in the {@link IncrementalUpdateConstants#NODE_VARS} + * column of the Fluo application. + * + * @param varOrder - An ordered collection of variables. (not null) + * @return The string representation of the variable order. + */ + public static String toVarOrderString(final Collection<String> varOrder) { + checkNotNull(varOrder); + return Joiner.on(VAR_DELIM).join(varOrder); + } + + /** + * Converts an ordered array of variables into the Variable Order + * String that is stored in the {@link IncrementalUpdateConstants#NODE_VARS} + * column of the Fluo application. + * + * @param varOrder - An ordered array of variables. (not null) + * @return The string representation of the variable order. + */ + public static String toVarOrderString(final String... varOrder) { + return Joiner.on(VAR_DELIM).join(varOrder); + } + + /** + * Converts a String into an array holding the Variable Order of a Binding Set. + * + * @param varOrderString - The string representation of the variable order. (not null) + * @return An ordered array holding the variable order of a binding set. + */ + public static String[] toVarOrder(final String varOrderString) { + checkNotNull(varOrderString); + return varOrderString.split(VAR_DELIM); + } + + /** + * Converts a {@link BindingSet} to the String representation that the Fluo + * application serializes to the Binding Set columns. + * + * @param bindingSet - The binding set values. (not null) + * @param varOrder - The order the variables must appear in. (not null) + * @return A {@code String} version of {@code bindingSet} suitable for + * serialization to one of the Fluo application's binding set columns. + */ + public static String toBindingSetString(final BindingSet bindingSet, final String[] varOrder) { + checkNotNull(bindingSet); + checkNotNull(varOrder); + + final StringBuilder bindingSetString = new StringBuilder(); + + for(int i = 0; i < varOrder.length; i++) { + // Add a value to the binding set. + final String varName = varOrder[i]; + final Value value = bindingSet.getBinding(varName).getValue(); + final RyaType ryaValue = RdfToRyaConversions.convertValue(value); + bindingSetString.append( ryaValue.getData() ).append(TYPE_DELIM).append( ryaValue.getDataType() ); + + // If there are more values to add, include a delimiter between them. + if(i != varOrder.length-1) { + bindingSetString.append(DELIM); + } + } + + return bindingSetString.toString(); + } + + /** + * Converts the String representation of a {@link BindingSet} as is created + * by {@link #toBindingSetString(BindingSet, String[])} back into a + * BindingSet. + * + * @param bindingSetString - The binding set values as a String. (not null) + * @param varOrder - The order the variables appear in the String version of + * the BindingSet. (not null) + * @return A {@link BindingSet} representation of the String. + */ + public static BindingSet toBindingSet(final String bindingSetString, final String[] varOrder) { + checkNotNull(bindingSetString); + checkNotNull(varOrder); + + final String[] bindingStrings = toBindingStrings(bindingSetString); + return toBindingSet(bindingStrings, varOrder); + } + + /** + * Creates a {@link BindingSet} from an ordered array of Strings that represent + * {@link Binding}s and their variable names. + * + * @param bindingStrings - An ordered array of Strings representing {@link Binding}s. (not null) + * @param varOrder - An ordered array of variable names for the binding strings. (not null) + * @return The parameters converted into a {@link BindingSet}. + */ + public static BindingSet toBindingSet(final String[] bindingStrings, final String[] varOrder) { + checkNotNull(varOrder); + checkNotNull(bindingStrings); + checkArgument(varOrder.length == bindingStrings.length); + + final QueryBindingSet bindingSet = new QueryBindingSet(); + + for(int i = 0; i < bindingStrings.length; i++) { + final String name = varOrder[i]; + final Value value = FluoStringConverter.toValue(bindingStrings[i]); + bindingSet.addBinding(name, value); + } + + return bindingSet; + } + + /** + * Extract the {@link Binding} strings from a {@link BindingSet}'s string form. + * + * @param bindingSetString - A {@link BindingSet} in its Fluo String form. (not null) + * @return The set's {@link Binding}s in Fluo String form. (not null) + */ + public static String[] toBindingStrings(final String bindingSetString) { + checkNotNull(bindingSetString); + return bindingSetString.split(DELIM); + } + + /** + * Creates a {@link Value} from a String representation of it. + * + * @param valueString - The String representation of the value. (not null) + * @return The {@link Value} representation of the String. + */ + public static Value toValue(final String valueString) { + checkNotNull(valueString); + + // Split the String that was stored in Fluo into its Value and Type parts. + final String[] valueAndType = valueString.split(TYPE_DELIM); + if(valueAndType.length != 2) { + throw new IllegalArgumentException("Array must contain data and type info!"); + } + + final String dataString = valueAndType[0]; + final String typeString = valueAndType[1]; + + // Convert the String Type into a URI that describes the type. + final URI typeURI = valueFactory.createURI(typeString); + + // Convert the String Value into a Value. + final Value value = typeURI.equals(XMLSchema.ANYURI) ? + valueFactory.createURI(dataString) : + valueFactory.createLiteral(dataString, new URIImpl(typeString)); + + return value; + } + + /** + * Converts the String representation of a {@link StatementPattern} back + * into the object version. + * + * @param patternString - The {@link StatementPattern} represented as a String. (not null) + * @return A {@link StatementPatter} built from the string. + */ + public static StatementPattern toStatementPattern(final String patternString) { + checkNotNull(patternString); + + final String[] parts = patternString.split(DELIM); + final String subjectPart = parts[0]; + final String predicatePart = parts[1]; + final String objectPart = parts[2]; + + final Var subject = toVar(subjectPart); + final Var predicate = toVar(predicatePart); + final Var object = toVar(objectPart); + + return new StatementPattern(subject, predicate, object); + } + + /** + * Converts the String representation of a {@link Var} back into the object version. + * + * @param varString - The {@link Var} represented as a String. (not null) + * @return A {@link Var} built from the string. + */ + public static Var toVar(final String varString) { + checkNotNull(varString); + + if(varString.startsWith("-const-")) { + // The variable is a constant value. + final String[] varParts = varString.split(TYPE_DELIM); + final String name = varParts[0]; + final String valueString = name.substring("-const-".length()); + + final String dataTypeString = varParts[1]; + if(dataTypeString.equals(URI_TYPE)) { + // Handle a URI object. + final Var var = new Var(name, new URIImpl(valueString)); + var.setAnonymous(true); + return var; + } else { + // Literal value. + final URI dataType = new URIImpl(dataTypeString); + final Literal value = new LiteralImpl(valueString, dataType); + final Var var = new Var(name, value); + var.setAnonymous(true); + return var; + } + } else { + // The variable is a named variable. + return new Var(varString); + } + } + + /** + * Provides a string representation of an SP which contains info about + * whether each component (subj, pred, obj) is constant and its data and + * data type if it is constant. + * + * @param sp - The statement pattern to convert. (not null) + * @return A String representation of the statement pattern that may be + * used to do triple matching. + */ + public static String toStatementPatternString(final StatementPattern sp) { + checkNotNull(sp); + + final Var subjVar = sp.getSubjectVar(); + String subj = subjVar.getName(); + if(subjVar.isConstant()) { + subj = subj + TYPE_DELIM + URI_TYPE; + } + + final Var predVar = sp.getPredicateVar(); + String pred = predVar.getName(); + if(predVar.isConstant()) { + pred = pred + TYPE_DELIM + URI_TYPE; + } + + final Var objVar = sp.getObjectVar(); + String obj = objVar.getName(); + if (objVar.isConstant()) { + final RyaType rt = RdfToRyaConversions.convertValue(objVar.getValue()); + obj = obj + TYPE_DELIM + rt.getDataType().stringValue(); + } + + return subj + DELIM + pred + DELIM + obj; + } +} \ No newline at end of file
