RYA-4 Adding Vagrant Example and Documentation
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/19e2e438 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/19e2e438 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/19e2e438 Branch: refs/heads/develop Commit: 19e2e438c9e6aef2585d72a5696b336b7bd7ab8b Parents: 6cd8aeb 22f82c7 Author: Aaron Mihalik <[email protected]> Authored: Sun Feb 7 14:15:27 2016 -0500 Committer: Aaron Mihalik <[email protected]> Committed: Sun Feb 7 14:15:27 2016 -0500 ---------------------------------------------------------------------- .gitignore | 2 + README.md | 4 + .../java/mvm/rya/indexing/RyaSailFactory.java | 84 ------ .../rya/sail/config/RyaAccumuloSailConfig.java | 161 +++++++++++ .../rya/sail/config/RyaAccumuloSailFactory.java | 92 +++++++ .../mvm/rya/sail/config/RyaSailFactory.java | 84 ++++++ .../org.openrdf.sail.config.SailFactory | 1 + .../repository/config/RyaAccumuloSail.ttl | 20 ++ .../external/PcjIntegrationTestingUtil.java | 2 +- .../external/tupleSet/AccumuloIndexSetTest.java | 2 +- .../sail/config/RyaAccumuloSailFactoryTest.java | 196 ++++++++++++++ extras/indexingExample/pom.xml | 10 - .../src/main/java/EntityDirectExample.java | 10 +- .../src/main/java/MongoRyaDirectExample.java | 2 +- .../src/main/java/RyaDirectExample.java | 2 +- extras/pom.xml | 1 + .../blueprints/impls/sail/log4j.properties | 33 +++ .../src/test/resources/log4j.properties | 29 -- extras/vagrantExample/pom.xml | 46 ++++ .../main/resources/create-RyaAccumuloSail.xsl | 138 ++++++++++ .../src/main/resources/create.xsl | 128 +++++++++ .../vagrantExample/src/main/vagrant/Vagrantfile | 268 +++++++++++++++++++ .../vagrantExample/src/main/vagrant/readme.md | 183 +++++++++++++ .../RdfCloudTripleStoreFactory.java | 56 ---- .../RdfCloudTripleStoreSailConfig.java | 133 --------- .../META-INF/org.openrdf.store.schemas | 1 - .../META-INF/schemas/cloudbasestore-schema.ttl | 20 -- .../org.openrdf.sail.config.SailFactory | 1 - 28 files changed, 1366 insertions(+), 343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/19e2e438/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java ---------------------------------------------------------------------- diff --cc extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java index 111de19,0000000..d065afb mode 100644,000000..100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/PcjIntegrationTestingUtil.java @@@ -1,404 -1,0 +1,404 @@@ +package mvm.rya.indexing.external; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaTypeResolverException; - import mvm.rya.indexing.RyaSailFactory; ++import mvm.rya.sail.config.RyaSailFactory; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.tupleSet.AccumuloPcjSerializer; +import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; +import mvm.rya.indexing.external.tupleSet.PcjTables; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjMetadata; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjVarOrderFactory; +import mvm.rya.indexing.external.tupleSet.PcjTables.ShiftVarOrderFactory; +import mvm.rya.indexing.external.tupleSet.PcjTables.VariableOrder; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Mutation; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.query.algebra.BindingSetAssignment; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + +public class PcjIntegrationTestingUtil { + + public static Set<QueryModelNode> getTupleSets(TupleExpr te) { + final ExternalTupleVisitor etv = new ExternalTupleVisitor(); + te.visit(etv); + return etv.getExtTup(); + } + + public static void deleteCoreRyaTables(Connector accCon, String prefix) + throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + final TableOperations ops = accCon.tableOperations(); + if (ops.exists(prefix + "spo")) { + ops.delete(prefix + "spo"); + } + if (ops.exists(prefix + "po")) { + ops.delete(prefix + "po"); + } + if (ops.exists(prefix + "osp")) { + ops.delete(prefix + "osp"); + } + } + + public static SailRepository getPcjRepo(String tablePrefix, String instance) + throws AccumuloException, AccumuloSecurityException, + RyaDAOException, RepositoryException { + + final AccumuloRdfConfiguration pcjConf = new AccumuloRdfConfiguration(); + pcjConf.set(ConfigUtils.USE_PCJ, "true"); + pcjConf.set(ConfigUtils.USE_MOCK_INSTANCE, "true"); + pcjConf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + pcjConf.setTablePrefix(tablePrefix); + + final Sail pcjSail = RyaSailFactory.getInstance(pcjConf); + final SailRepository pcjRepo = new SailRepository(pcjSail); + pcjRepo.initialize(); + return pcjRepo; + } + + public static SailRepository getNonPcjRepo(String tablePrefix, + String instance) throws AccumuloException, + AccumuloSecurityException, RyaDAOException, RepositoryException { + + final AccumuloRdfConfiguration nonPcjConf = new AccumuloRdfConfiguration(); + nonPcjConf.set(ConfigUtils.USE_MOCK_INSTANCE, "true"); + nonPcjConf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + nonPcjConf.setTablePrefix(tablePrefix); + + final Sail nonPcjSail = RyaSailFactory.getInstance(nonPcjConf); + final SailRepository nonPcjRepo = new SailRepository(nonPcjSail); + nonPcjRepo.initialize(); + return nonPcjRepo; + } + + public static void closeAndShutdown(SailRepositoryConnection connection, + SailRepository repo) throws RepositoryException { + connection.close(); + repo.shutDown(); + } + + public static void deleteIndexTables(Connector accCon, int tableNum, + String prefix) throws AccumuloException, AccumuloSecurityException, + TableNotFoundException { + final TableOperations ops = accCon.tableOperations(); + final String tablename = prefix + "INDEX_"; + for (int i = 1; i < tableNum + 1; i++) { + if (ops.exists(tablename + i)) { + ops.delete(tablename + i); + } + } + } + + public static class BindingSetAssignmentCollector extends + QueryModelVisitorBase<RuntimeException> { + + private final Set<QueryModelNode> bindingSetList = Sets.newHashSet(); + + public Set<QueryModelNode> getBindingSetAssignments() { + return bindingSetList; + } + + public boolean containsBSAs() { + return bindingSetList.size() > 0; + } + + @Override + public void meet(BindingSetAssignment node) { + bindingSetList.add(node); + super.meet(node); + } + + } + + public static class ExternalTupleVisitor extends + QueryModelVisitorBase<RuntimeException> { + + private final Set<QueryModelNode> eSet = new HashSet<>(); + + @Override + public void meetNode(QueryModelNode node) throws RuntimeException { + if (node instanceof ExternalTupleSet) { + eSet.add(node); + } + super.meetNode(node); + } + + public Set<QueryModelNode> getExtTup() { + return eSet; + } + + } + + + + + + +//****************************Creation and Population of PcjTables*********************************** + + + + + + /** + * Creates a new PCJ Table in Accumulo and populates it by scanning an + * instance of Rya for historic matches. + * <p> + * If any portion of this operation fails along the way, the partially + * create PCJ table will be left in Accumulo. + * + * @param ryaConn - Connects to the Rya that will be scanned. (not null) + * @param accumuloConn - Connects to the accumulo that hosts the PCJ results. (not null) + * @param pcjTableName - The name of the PCJ table that will be created. (not null) + * @param sparql - The SPARQL query whose results will be loaded into the table. (not null) + * @param resultVariables - The variables that are included in the query's resulting binding sets. (not null) + * @param pcjVarOrderFactory - An optional factory that indicates the various variable orders + * the results will be stored in. If one is not provided, then {@link ShiftVarOrderFactory} + * is used by default. (not null) + * @throws PcjException The PCJ table could not be create or the values from + * Rya were not able to be loaded into it. + */ + public static void createAndPopulatePcj( + final RepositoryConnection ryaConn, + final Connector accumuloConn, + final String pcjTableName, + final String sparql, + final String[] resultVariables, + final Optional<PcjVarOrderFactory> pcjVarOrderFactory) throws PcjException { + checkNotNull(ryaConn); + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(sparql); + checkNotNull(resultVariables); + checkNotNull(pcjVarOrderFactory); + + PcjTables pcj = new PcjTables(); + // Create the PCJ's variable orders. + PcjVarOrderFactory varOrderFactory = pcjVarOrderFactory.or(new ShiftVarOrderFactory()); + Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( new VariableOrder(resultVariables) ); + + // Create the PCJ table in Accumulo. + pcj.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Load historic matches from Rya into the PCJ table. + populatePcj(accumuloConn, pcjTableName, ryaConn); + } + + + /** + * Scan Rya for results that solve the PCJ's query and store them in the PCJ + * table. + * <p> + * This method assumes the PCJ table has already been created. + * + * @param accumuloConn + * - A connection to the Accumulo that hosts the PCJ table. (not + * null) + * @param pcjTableName + * - The name of the PCJ table that will receive the results. + * (not null) + * @param ryaConn + * - A connection to the Rya store that will be queried to find + * results. (not null) + * @throws PcjException + * If results could not be written to the PCJ table, the PCJ + * table does not exist, or the query that is being execute was + * malformed. + */ + public static void populatePcj(final Connector accumuloConn, + final String pcjTableName, final RepositoryConnection ryaConn) + throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(ryaConn); + + try { + // Fetch the query that needs to be executed from the PCJ table. + PcjMetadata pcjMetadata = new PcjTables().getPcjMetadata( + accumuloConn, pcjTableName); + String sparql = pcjMetadata.getSparql(); + + // Query Rya for results to the SPARQL query. + TupleQuery query = ryaConn.prepareTupleQuery(QueryLanguage.SPARQL, + sparql); + TupleQueryResult results = query.evaluate(); + + // Load batches of 1000 of them at a time into the PCJ table + Set<BindingSet> batch = new HashSet<>(1000); + while (results.hasNext()) { + batch.add(results.next()); + + if (batch.size() == 1000) { + addResults(accumuloConn, pcjTableName, batch); + batch.clear(); + } + } + + if (!batch.isEmpty()) { + addResults(accumuloConn, pcjTableName, batch); + } + + } catch (RepositoryException | MalformedQueryException + | QueryEvaluationException e) { + throw new PcjException( + "Could not populate a PCJ table with Rya results for the table named: " + + pcjTableName, e); + } + } + + public static void addResults(final Connector accumuloConn, + final String pcjTableName, final Collection<BindingSet> results) + throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(results); + + // Write a result to each of the variable orders that are in the table. + writeResults(accumuloConn, pcjTableName, results); + } + + /** + * Add a collection of results to a specific PCJ table. + * + * @param accumuloConn + * - A connection to the Accumulo that hosts the PCJ table. (not + * null) + * @param pcjTableName + * - The name of the PCJ table that will receive the results. + * (not null) + * @param results + * - Binding sets that will be written to the PCJ table. (not + * null) + * @throws PcjException + * The provided PCJ table doesn't exist, is missing the PCJ + * metadata, or the result could not be written to it. + */ + private static void writeResults(final Connector accumuloConn, + final String pcjTableName, final Collection<BindingSet> results) + throws PcjException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + checkNotNull(results); + + // Fetch the variable orders from the PCJ table. + PcjMetadata metadata = new PcjTables().getPcjMetadata(accumuloConn, + pcjTableName); + + // Write each result formatted using each of the variable orders. + BatchWriter writer = null; + try { + writer = accumuloConn.createBatchWriter(pcjTableName, + new BatchWriterConfig()); + for (BindingSet result : results) { + Set<Mutation> addResultMutations = makeWriteResultMutations( + metadata.getVarOrders(), result); + writer.addMutations(addResultMutations); + } + } catch (TableNotFoundException | MutationsRejectedException e) { + throw new PcjException( + "Could not add results to the PCJ table named: " + + pcjTableName, e); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (MutationsRejectedException e) { + throw new PcjException( + "Could not add results to a PCJ table because some of the mutations were rejected.", + e); + } + } + } + } + + /** + * Create the {@link Mutations} required to write a new {@link BindingSet} + * to a PCJ table for each {@link VariableOrder} that is provided. + * + * @param varOrders + * - The variables orders the result will be written to. (not + * null) + * @param result + * - A new PCJ result. (not null) + * @return Mutation that will write the result to a PCJ table. + * @throws PcjException + * The binding set could not be encoded. + */ + private static Set<Mutation> makeWriteResultMutations( + final Set<VariableOrder> varOrders, final BindingSet result) + throws PcjException { + checkNotNull(varOrders); + checkNotNull(result); + + Set<Mutation> mutations = new HashSet<>(); + + for (final VariableOrder varOrder : varOrders) { + try { + // Serialize the result to the variable order. + byte[] serializedResult = AccumuloPcjSerializer.serialize( + result, varOrder.toArray()); + + // Row ID = binding set values, Column Family = variable order + // of the binding set. + Mutation addResult = new Mutation(serializedResult); + addResult.put(varOrder.toString(), "", ""); + mutations.add(addResult); + } catch (RyaTypeResolverException e) { + throw new PcjException("Could not serialize a result.", e); + } + } + + return mutations; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/19e2e438/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java ---------------------------------------------------------------------- diff --cc extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java index cd52e9a,0000000..37cca41 mode 100644,000000..100644 --- a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java +++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/AccumuloIndexSetTest.java @@@ -1,697 -1,0 +1,697 @@@ +package mvm.rya.indexing.external.tupleSet; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import info.aduna.iteration.CloseableIteration; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaTypeResolverException; - import mvm.rya.indexing.RyaSailFactory; ++import mvm.rya.sail.config.RyaSailFactory; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PcjIntegrationTestingUtil; +import mvm.rya.indexing.external.QueryVariableNormalizer; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjTableNameFactory; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjVarOrderFactory; +import mvm.rya.rdftriplestore.RyaSailRepository; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.hadoop.conf.Configuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.NumericLiteralImpl; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class AccumuloIndexSetTest { + + protected static Connector accumuloConn = null; + protected RyaSailRepository ryaRepo = null; + protected RepositoryConnection ryaConn = null; + protected Configuration conf = getConf(); + protected String prefix = "rya_"; + + @Before + public void init() throws AccumuloException, AccumuloSecurityException, RyaDAOException, RepositoryException, TableNotFoundException { + accumuloConn = ConfigUtils.getConnector(conf); + final TableOperations ops = accumuloConn.tableOperations(); + if(ops.exists(prefix+"INDEX_"+ "testPcj")) { + ops.delete(prefix+"INDEX_"+ "testPcj"); + } + ryaRepo = new RyaSailRepository(RyaSailFactory.getInstance(conf)); + ryaRepo.initialize(); + ryaConn = ryaRepo.getConnection(); + } + + + /** + * TODO doc + * @throws MutationsRejectedException + * @throws QueryEvaluationException + * @throws SailException + * @throws MalformedQueryException + */ + @Test + public void accumuloIndexSetTestWithEmptyBindingSet() throws RepositoryException, PcjException, TableNotFoundException, + RyaTypeResolverException, MalformedQueryException, SailException, QueryEvaluationException, MutationsRejectedException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + final AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn, pcjTableName); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(new QueryBindingSet()); + final Set<BindingSet> fetchedResults = new HashSet<BindingSet>(); + while(results.hasNext()) { + fetchedResults.add(results.next()); + } + // Ensure the expected results match those that were stored. + final QueryBindingSet alice = new QueryBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final QueryBindingSet bob = new QueryBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final QueryBindingSet charlie = new QueryBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + final Set<BindingSet> expectedResults = Sets.<BindingSet>newHashSet(alice, bob, charlie); + Assert.assertEquals(expectedResults, fetchedResults); + } + + + /** + * TODO doc + * @throws MutationsRejectedException + * @throws QueryEvaluationException + * @throws SailException + * @throws MalformedQueryException + */ + @Test + public void accumuloIndexSetTestWithBindingSet() throws RepositoryException, PcjException, TableNotFoundException, + RyaTypeResolverException, MalformedQueryException, SailException, QueryEvaluationException, MutationsRejectedException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + final AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn, pcjTableName); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("name",new URIImpl("http://Alice")); + bs.addBinding("location",new URIImpl("http://Virginia")); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bs); + + bs.addBinding("age",new NumericLiteralImpl(14, XMLSchema.INTEGER)); + Assert.assertEquals(bs, results.next()); + + } + + + @Test + public void accumuloIndexSetTestWithTwoBindingSets() throws RepositoryException, PcjException, TableNotFoundException, + RyaTypeResolverException, MalformedQueryException, SailException, QueryEvaluationException, MutationsRejectedException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + final AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn, pcjTableName); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("birthDate",new LiteralImpl("1983-03-17",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs.addBinding("name",new URIImpl("http://Alice")); + + final QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("birthDate",new LiteralImpl("1983-04-18",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs2.addBinding("name",new URIImpl("http://Bob")); + + final Set<BindingSet> bSets = Sets.<BindingSet>newHashSet(bs,bs2); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bSets); + + final QueryBindingSet alice = new QueryBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + alice.addBinding("birthDate", new LiteralImpl("1983-03-17",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + + final QueryBindingSet bob = new QueryBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + bob.addBinding("birthDate", new LiteralImpl("1983-04-18",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + + + final Set<BindingSet> fetchedResults = new HashSet<>(); + while(results.hasNext()) { + final BindingSet next = results.next(); + System.out.println(next); + fetchedResults.add(next); + } + + Assert.assertEquals(Sets.<BindingSet>newHashSet(alice,bob), fetchedResults); + } + + + + @Test + public void accumuloIndexSetTestWithNoBindingSet() throws RepositoryException, PcjException, TableNotFoundException, + RyaTypeResolverException, MalformedQueryException, SailException, QueryEvaluationException, MutationsRejectedException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + final AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn, pcjTableName); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(new HashSet<BindingSet>()); + + Assert.assertEquals(false, results.hasNext()); + + } + + + @Test + public void accumuloIndexSetTestWithDirectProductBindingSet() throws RepositoryException, PcjException, TableNotFoundException, + RyaTypeResolverException, MalformedQueryException, SailException, QueryEvaluationException, MutationsRejectedException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + final AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn, pcjTableName); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("birthDate",new LiteralImpl("1983-03-17",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs.addBinding("location",new URIImpl("http://Virginia")); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bs); + + final QueryBindingSet alice = new QueryBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + alice.addAll(bs); + + final QueryBindingSet bob = new QueryBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + bob.addAll(bs); + + final QueryBindingSet charlie = new QueryBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + charlie.addAll(bs); + + final Set<BindingSet> fetchedResults = new HashSet<>(); + while(results.hasNext()) { + fetchedResults.add(results.next()); + } + Assert.assertEquals(3,fetchedResults.size()); + Assert.assertEquals(Sets.<BindingSet>newHashSet(alice,bob,charlie), fetchedResults); + } + + @Test + public void accumuloIndexSetTestWithTwoDirectProductBindingSet() throws RepositoryException, PcjException, TableNotFoundException, + RyaTypeResolverException, MalformedQueryException, SailException, QueryEvaluationException, MutationsRejectedException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + final AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn, pcjTableName); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("birthDate",new LiteralImpl("1983-03-17",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs.addBinding("location",new URIImpl("http://Virginia")); + + final QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("birthDate",new LiteralImpl("1983-04-18",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs2.addBinding("location",new URIImpl("http://Georgia")); + + final Set<BindingSet> bSets = Sets.<BindingSet>newHashSet(bs,bs2); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bSets); + + final QueryBindingSet alice1 = new QueryBindingSet(); + alice1.addBinding("name", new URIImpl("http://Alice")); + alice1.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + alice1.addAll(bs); + + final QueryBindingSet bob1 = new QueryBindingSet(); + bob1.addBinding("name", new URIImpl("http://Bob")); + bob1.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + bob1.addAll(bs); + + final QueryBindingSet charlie1 = new QueryBindingSet(); + charlie1.addBinding("name", new URIImpl("http://Charlie")); + charlie1.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + charlie1.addAll(bs); + + final QueryBindingSet alice2 = new QueryBindingSet(); + alice2.addBinding("name", new URIImpl("http://Alice")); + alice2.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + alice2.addAll(bs2); + + final QueryBindingSet bob2 = new QueryBindingSet(); + bob2.addBinding("name", new URIImpl("http://Bob")); + bob2.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + bob2.addAll(bs2); + + final QueryBindingSet charlie2 = new QueryBindingSet(); + charlie2.addBinding("name", new URIImpl("http://Charlie")); + charlie2.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + charlie2.addAll(bs2); + + final Set<BindingSet> fetchedResults = new HashSet<>(); + while(results.hasNext()) { + final BindingSet next = results.next(); + System.out.println(next); + fetchedResults.add(next); + } + + Assert.assertEquals(Sets.<BindingSet>newHashSet(alice1,bob1,charlie1,alice2,bob2,charlie2), fetchedResults); + } + + + + @Test + public void accumuloIndexSetTestWithTwoDirectProductBindingSetsWithMapping() throws RepositoryException, PcjException, TableNotFoundException, + RyaTypeResolverException, MalformedQueryException, SailException, QueryEvaluationException, MutationsRejectedException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + final String sparql2 = + "SELECT ?x ?y " + + "{" + + "FILTER(?y < 30) ." + + "?x <http://hasAge> ?y." + + "?x <http://playsSport> \"Soccer\" " + + "}"; + + final SPARQLParser p = new SPARQLParser(); + final ParsedQuery pq = p.parseQuery(sparql2, null); + + final Map<String,String> map = new HashMap<>(); + map.put("x", "name"); + map.put("y", "age"); + final AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn, pcjTableName); + ais.setProjectionExpr((Projection) pq.getTupleExpr()); + ais.setTableVarMap(map); + ais.setSupportedVariableOrderMap(Lists.<String>newArrayList("x;y","y;x")); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("birthDate",new LiteralImpl("1983-03-17",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs.addBinding("x",new URIImpl("http://Alice")); + + final QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("birthDate",new LiteralImpl("1983-04-18",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs2.addBinding("x",new URIImpl("http://Bob")); + + final Set<BindingSet> bSets = Sets.<BindingSet>newHashSet(bs,bs2); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bSets); + + final QueryBindingSet alice = new QueryBindingSet(); + alice.addBinding("x", new URIImpl("http://Alice")); + alice.addBinding("y", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + alice.addBinding("birthDate", new LiteralImpl("1983-03-17",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + + final QueryBindingSet bob = new QueryBindingSet(); + bob.addBinding("x", new URIImpl("http://Bob")); + bob.addBinding("y", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + bob.addBinding("birthDate", new LiteralImpl("1983-04-18",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + + + final Set<BindingSet> fetchedResults = new HashSet<>(); + while(results.hasNext()) { + final BindingSet next = results.next(); + System.out.println(next); + fetchedResults.add(next); + } + + Assert.assertEquals(Sets.<BindingSet>newHashSet(alice,bob), fetchedResults); + } + + + + @Test + public void accumuloIndexSetTestWithTwoDirectProductBindingSetsWithConstantMapping() throws Exception { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + final String sparql2 = + "SELECT ?x " + + "{" + + "?x <http://hasAge> 16 ." + + "?x <http://playsSport> \"Soccer\" " + + "}"; + + final SPARQLParser p = new SPARQLParser(); + final ParsedQuery pq1 = p.parseQuery(sparql, null); + final ParsedQuery pq2 = p.parseQuery(sparql2, null); + + final AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn, pcjTableName); + ais.setProjectionExpr((Projection) QueryVariableNormalizer.getNormalizedIndex(pq2.getTupleExpr(), pq1.getTupleExpr()).get(0)); + + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("birthDate",new LiteralImpl("1983-03-17",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs.addBinding("x",new URIImpl("http://Alice")); + + final QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("birthDate",new LiteralImpl("1983-04-18",new URIImpl("http://www.w3.org/2001/XMLSchema#date"))); + bs2.addBinding("x",new URIImpl("http://Bob")); + + final Set<BindingSet> bSets = Sets.<BindingSet>newHashSet(bs,bs2); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bSets); + + final Set<BindingSet> fetchedResults = new HashSet<>(); + while(results.hasNext()) { + final BindingSet next = results.next(); + fetchedResults.add(next); + } + + Assert.assertEquals(Sets.<BindingSet>newHashSet(bs2), fetchedResults); + } + + + + @Test + public void accumuloIndexSetTestAttemptJoinAccrossTypes() throws Exception { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(prefix, "testPcj"); + + // Create and populate the PCJ table. + PcjIntegrationTestingUtil.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + AccumuloIndexSet ais = new AccumuloIndexSet(accumuloConn,pcjTableName); + + final QueryBindingSet bs1 = new QueryBindingSet(); + bs1.addBinding("age",new LiteralImpl("16")); + final QueryBindingSet bs2 = new QueryBindingSet(); + bs2.addBinding("age",new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final Set<BindingSet> bSets = Sets.<BindingSet>newHashSet(bs1,bs2); + + final CloseableIteration<BindingSet, QueryEvaluationException> results = ais.evaluate(bSets); + + final Set<BindingSet> fetchedResults = new HashSet<>(); + while(results.hasNext()) { + final BindingSet next = results.next(); + fetchedResults.add(next); + } + + bs2.addBinding("name", new URIImpl("http://Alice")); + Assert.assertEquals(Sets.<BindingSet>newHashSet(bs2), fetchedResults); + } + + + + + + + + @After + public void close() throws RepositoryException { + ryaConn.close(); + ryaRepo.shutDown(); + } + + + private static Configuration getConf() { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_"); + conf.set(ConfigUtils.CLOUDBASE_USER, "root"); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ""); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, "instance"); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); + return conf; + } + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/19e2e438/extras/indexingExample/src/main/java/RyaDirectExample.java ---------------------------------------------------------------------- diff --cc extras/indexingExample/src/main/java/RyaDirectExample.java index db1ffe3,122a904..2c4e954 --- a/extras/indexingExample/src/main/java/RyaDirectExample.java +++ b/extras/indexingExample/src/main/java/RyaDirectExample.java @@@ -21,13 -22,10 +21,13 @@@ import java.util.List import mvm.rya.accumulo.AccumuloRdfConfiguration; import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.persist.RyaDAOException; - import mvm.rya.indexing.RyaSailFactory; ++import mvm.rya.sail.config.RyaSailFactory; import mvm.rya.indexing.accumulo.ConfigUtils; import mvm.rya.indexing.accumulo.geo.GeoConstants; -import mvm.rya.indexing.external.tupleSet.AccumuloIndexSet; -import mvm.rya.sail.config.RyaSailFactory; +import mvm.rya.indexing.external.tupleSet.PcjTables; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjException; +import mvm.rya.indexing.external.tupleSet.PcjTables.PcjVarOrderFactory; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException;
