http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9c12630b/extras/indexing/src/test/java/org/apache/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java deleted file mode 100644 index 1896025..0000000 --- a/extras/indexing/src/test/java/org/apache/rya/indexing/external/PrecompJoinOptimizerIntegrationTest.java +++ /dev/null @@ -1,505 +0,0 @@ -package org.apache.rya.indexing.external; - -import java.net.UnknownHostException; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.List; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.TableExistsException; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.openrdf.model.URI; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.model.vocabulary.RDF; -import org.openrdf.model.vocabulary.RDFS; -import org.openrdf.query.BindingSet; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.QueryLanguage; -import org.openrdf.query.QueryResultHandlerException; -import org.openrdf.query.TupleQueryResultHandler; -import org.openrdf.query.TupleQueryResultHandlerException; -import org.openrdf.repository.RepositoryException; -import org.openrdf.repository.sail.SailRepository; -import org.openrdf.repository.sail.SailRepositoryConnection; -import org.openrdf.sail.SailException; - -import com.google.common.base.Optional; - -import org.apache.rya.api.persist.RyaDAOException; -import org.apache.rya.rdftriplestore.inference.InferenceEngineException; - -public class PrecompJoinOptimizerIntegrationTest { - - private SailRepositoryConnection conn, pcjConn; - private SailRepository repo, pcjRepo; - private Connector accCon; - String tablePrefix = "table_"; - URI sub, sub2, obj, obj2, subclass, subclass2, talksTo; - - @Before - public void init() throws RepositoryException, - TupleQueryResultHandlerException, QueryEvaluationException, - MalformedQueryException, AccumuloException, - AccumuloSecurityException, TableExistsException, RyaDAOException, - TableNotFoundException, InferenceEngineException, NumberFormatException, - UnknownHostException, SailException { - - repo = PcjIntegrationTestingUtil.getNonPcjRepo(tablePrefix, "instance"); - conn = repo.getConnection(); - - pcjRepo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance"); - pcjConn = pcjRepo.getConnection(); - - sub = new URIImpl("uri:entity"); - subclass = new URIImpl("uri:class"); - obj = new URIImpl("uri:obj"); - talksTo = new URIImpl("uri:talksTo"); - - conn.add(sub, RDF.TYPE, subclass); - conn.add(sub, RDFS.LABEL, new LiteralImpl("label")); - conn.add(sub, talksTo, obj); - - sub2 = new URIImpl("uri:entity2"); - subclass2 = new URIImpl("uri:class2"); - obj2 = new URIImpl("uri:obj2"); - - conn.add(sub2, RDF.TYPE, subclass2); - conn.add(sub2, RDFS.LABEL, new LiteralImpl("label2")); - conn.add(sub2, talksTo, obj2); - - accCon = new MockInstance("instance").getConnector("root", - new PasswordToken("")); - - } - - @After - public void close() throws RepositoryException, AccumuloException, - AccumuloSecurityException, TableNotFoundException { - - PcjIntegrationTestingUtil.closeAndShutdown(conn, repo); - PcjIntegrationTestingUtil.closeAndShutdown(pcjConn, pcjRepo); - PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix); - PcjIntegrationTestingUtil.deleteIndexTables(accCon, 2, tablePrefix); - - } - - @Test - public void testEvaluateSingeIndex() - throws TupleQueryResultHandlerException, QueryEvaluationException, - MalformedQueryException, RepositoryException, AccumuloException, - AccumuloSecurityException, TableExistsException, RyaDAOException, - SailException, TableNotFoundException, PcjException, InferenceEngineException, - NumberFormatException, UnknownHostException { - - final String indexSparqlString = ""// - + "SELECT ?e ?l ?c " // - + "{" // - + " ?e a ?c . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_1", indexSparqlString, new String[] { "e", "l", "c" }, - Optional.<PcjVarOrderFactory> absent()); - final String queryString = ""// - + "SELECT ?e ?c ?l ?o " // - + "{" // - + " ?e a ?c . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// - + " ?e <uri:talksTo> ?o . "// - + "}";// - - final CountingResultHandler crh = new CountingResultHandler(); - PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix); - PcjIntegrationTestingUtil.closeAndShutdown(conn, repo); - repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance"); - conn = repo.getConnection(); - conn.add(sub, talksTo, obj); - conn.add(sub2, talksTo, obj2); - pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(crh); - - Assert.assertEquals(2, crh.getCount()); - - } - - @Test - public void testEvaluateTwoIndexTwoVarOrder1() throws AccumuloException, - AccumuloSecurityException, TableExistsException, - RepositoryException, MalformedQueryException, SailException, - QueryEvaluationException, TableNotFoundException, - TupleQueryResultHandlerException, RyaDAOException, PcjException { - - conn.add(obj, RDFS.LABEL, new LiteralImpl("label")); - conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2")); - - final String indexSparqlString = ""// - + "SELECT ?e ?l ?c " // - + "{" // - + " ?e a ?c . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - final String indexSparqlString2 = ""// - + "SELECT ?e ?o ?l " // - + "{" // - + " ?e <uri:talksTo> ?o . "// - + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - final String queryString = ""// - + "SELECT ?e ?c ?l ?o " // - + "{" // - + " ?e a ?c . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// - + " ?e <uri:talksTo> ?o . "// - + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_1", indexSparqlString, new String[] { "e", "l", "c" }, - Optional.<PcjVarOrderFactory> absent()); - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_2", indexSparqlString2, new String[] { "e", "l", "o" }, - Optional.<PcjVarOrderFactory> absent()); - final CountingResultHandler crh = new CountingResultHandler(); - PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix); - pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate( - crh); - - Assert.assertEquals(2, crh.getCount()); - - } - - @Test - public void testEvaluateSingeFilterIndex() - throws TupleQueryResultHandlerException, QueryEvaluationException, - MalformedQueryException, RepositoryException, AccumuloException, - AccumuloSecurityException, TableExistsException, RyaDAOException, - SailException, TableNotFoundException, PcjException, InferenceEngineException, - NumberFormatException, UnknownHostException { - - final String indexSparqlString = ""// - + "SELECT ?e ?l ?c " // - + "{" // - + " Filter(?e = <uri:entity>) " // - + " ?e a ?c . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_1", indexSparqlString, new String[] { "e", "l", "c" }, - Optional.<PcjVarOrderFactory> absent()); - final String queryString = ""// - + "SELECT ?e ?c ?l ?o " // - + "{" // - + " Filter(?e = <uri:entity>) " // - + " ?e a ?c . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// - + " ?e <uri:talksTo> ?o . "// - + "}";// - - final CountingResultHandler crh = new CountingResultHandler(); - PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix); - PcjIntegrationTestingUtil.closeAndShutdown(conn, repo); - repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance"); - conn = repo.getConnection(); - conn.add(sub, talksTo, obj); - conn.add(sub2, talksTo, obj2); - pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate( - crh); - - Assert.assertEquals(1, crh.getCount()); - - } - - @Test - public void testEvaluateSingeFilterWithUnion() - throws TupleQueryResultHandlerException, QueryEvaluationException, - MalformedQueryException, RepositoryException, AccumuloException, - AccumuloSecurityException, TableExistsException, RyaDAOException, - SailException, TableNotFoundException, PcjException, InferenceEngineException, - NumberFormatException, UnknownHostException { - - final String indexSparqlString2 = ""// - + "SELECT ?e ?l ?c " // - + "{" // - + " Filter(?l = \"label2\") " // - + " ?e a ?c . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_2", indexSparqlString2, new String[] { "e", "l", "c" }, - Optional.<PcjVarOrderFactory> absent()); - - final String queryString = ""// - + "SELECT ?e ?c ?o ?m ?l" // - + "{" // - + " Filter(?l = \"label2\") " // - + " ?e <uri:talksTo> ?o . "// - + " { ?e a ?c . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?m }"// - + " UNION { ?e a ?c . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l }"// - + "}";// - - final CountingResultHandler crh = new CountingResultHandler(); - PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix); - PcjIntegrationTestingUtil.closeAndShutdown(conn, repo); - repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance"); - conn = repo.getConnection(); - conn.add(sub, talksTo, obj); - conn.add(sub2, talksTo, obj2); - pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate( - crh); - - Assert.assertEquals(1, crh.getCount()); - - } - - @Test - public void testEvaluateSingeFilterWithLeftJoin() - throws TupleQueryResultHandlerException, QueryEvaluationException, - MalformedQueryException, RepositoryException, AccumuloException, - AccumuloSecurityException, TableExistsException, RyaDAOException, - SailException, TableNotFoundException, PcjException, InferenceEngineException, - NumberFormatException, UnknownHostException { - - final String indexSparqlString1 = ""// - + "SELECT ?e ?l ?c " // - + "{" // - + " Filter(?l = \"label3\") " // - + " ?e a ?c . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - final URI sub3 = new URIImpl("uri:entity3"); - final URI subclass3 = new URIImpl("uri:class3"); - conn.add(sub3, RDF.TYPE, subclass3); - conn.add(sub3, RDFS.LABEL, new LiteralImpl("label3")); - - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_1", indexSparqlString1, new String[] { "e", "l", "c" }, - Optional.<PcjVarOrderFactory> absent()); - final String queryString = ""// - + "SELECT ?e ?c ?o ?m ?l" // - + "{" // - + " Filter(?l = \"label3\") " // - + " ?e a ?c . " // - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . " // - + " OPTIONAL { ?e <uri:talksTo> ?o . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?m }"// - + "}";// - - final CountingResultHandler crh = new CountingResultHandler(); - PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix); - PcjIntegrationTestingUtil.closeAndShutdown(conn, repo); - repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance"); - conn = repo.getConnection(); - conn.add(sub, talksTo, obj); - conn.add(sub, RDFS.LABEL, new LiteralImpl("label")); - pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate( - crh); - - Assert.assertEquals(1, crh.getCount()); - - } - - @Test - public void testEvaluateTwoIndexUnionFilter() throws AccumuloException, - AccumuloSecurityException, TableExistsException, - RepositoryException, MalformedQueryException, SailException, - QueryEvaluationException, TableNotFoundException, - TupleQueryResultHandlerException, RyaDAOException, PcjException, InferenceEngineException, - NumberFormatException, UnknownHostException { - - conn.add(obj, RDFS.LABEL, new LiteralImpl("label")); - conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2")); - conn.add(sub, RDF.TYPE, obj); - conn.add(sub2, RDF.TYPE, obj2); - - final String indexSparqlString = ""// - + "SELECT ?e ?l ?o " // - + "{" // - + " Filter(?l = \"label2\") " // - + " ?e a ?o . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - final String indexSparqlString2 = ""// - + "SELECT ?e ?l ?o " // - + "{" // - + " Filter(?l = \"label2\") " // - + " ?e <uri:talksTo> ?o . "// - + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - final String queryString = ""// - + "SELECT ?c ?e ?l ?o " // - + "{" // - + " Filter(?l = \"label2\") " // - + " ?e a ?c . "// - + " { ?e a ?o . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l }"// - + " UNION { ?e <uri:talksTo> ?o . ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l }"// - + "}";// - - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_1", indexSparqlString, new String[] { "e", "l", "o" }, - Optional.<PcjVarOrderFactory> absent()); - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_2", indexSparqlString2, new String[] { "e", "l", "o" }, - Optional.<PcjVarOrderFactory> absent()); - - PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix); - PcjIntegrationTestingUtil.closeAndShutdown(conn, repo); - repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance"); - conn = repo.getConnection(); - conn.add(sub2, RDF.TYPE, subclass2); - conn.add(sub2, RDF.TYPE, obj2); - final CountingResultHandler crh = new CountingResultHandler(); - pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate( - crh); - - Assert.assertEquals(6, crh.getCount()); - - } - - @Test - public void testEvaluateTwoIndexLeftJoinUnionFilter() - throws AccumuloException, AccumuloSecurityException, - TableExistsException, RepositoryException, MalformedQueryException, - SailException, QueryEvaluationException, TableNotFoundException, - TupleQueryResultHandlerException, RyaDAOException, PcjException, InferenceEngineException, - NumberFormatException, UnknownHostException { - - conn.add(obj, RDFS.LABEL, new LiteralImpl("label")); - conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2")); - conn.add(sub, RDF.TYPE, obj); - conn.add(sub2, RDF.TYPE, obj2); - - final URI livesIn = new URIImpl("uri:livesIn"); - final URI city = new URIImpl("uri:city"); - final URI city2 = new URIImpl("uri:city2"); - final URI city3 = new URIImpl("uri:city3"); - conn.add(sub, livesIn, city); - conn.add(sub2, livesIn, city2); - conn.add(sub2, livesIn, city3); - conn.add(sub, livesIn, city3); - - final String indexSparqlString = ""// - + "SELECT ?e ?l ?o " // - + "{" // - + " ?e a ?o . "// - + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - final String indexSparqlString2 = ""// - + "SELECT ?e ?l ?o " // - + "{" // - + " ?e <uri:talksTo> ?o . "// - + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l "// - + "}";// - - final String queryString = ""// - + "SELECT ?c ?e ?l ?o " // - + "{" // - + " Filter(?c = <uri:city3>) " // - + " ?e <uri:livesIn> ?c . "// - + " OPTIONAL{{ ?e a ?o . ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l }"// - + " UNION { ?e <uri:talksTo> ?o . ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l }}"// - + "}";// - - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_1", indexSparqlString, new String[] { "e", "l", "o" }, - Optional.<PcjVarOrderFactory> absent()); - PcjIntegrationTestingUtil.createAndPopulatePcj(conn, accCon, tablePrefix - + "INDEX_2", indexSparqlString2, new String[] { "e", "l", "o" }, - Optional.<PcjVarOrderFactory> absent()); - - PcjIntegrationTestingUtil.deleteCoreRyaTables(accCon, tablePrefix); - PcjIntegrationTestingUtil.closeAndShutdown(conn, repo); - repo = PcjIntegrationTestingUtil.getPcjRepo(tablePrefix, "instance"); - conn = repo.getConnection(); - conn.add(sub2, livesIn, city3); - conn.add(sub, livesIn, city3); - - final CountingResultHandler crh = new CountingResultHandler(); - pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate( - crh); - - Assert.assertEquals(6, crh.getCount()); - - } - - public static class CountingResultHandler implements - TupleQueryResultHandler { - private int count = 0; - - public int getCount() { - return count; - } - - public void resetCount() { - count = 0; - } - - @Override - public void startQueryResult(final List<String> arg0) - throws TupleQueryResultHandlerException { - } - - @Override - public void handleSolution(final BindingSet arg0) - throws TupleQueryResultHandlerException { - System.out.println(arg0); - count++; - System.out.println("Count is " + count); - } - - @Override - public void endQueryResult() throws TupleQueryResultHandlerException { - } - - @Override - public void handleBoolean(final boolean arg0) - throws QueryResultHandlerException { - // TODO Auto-generated method stub - - } - - @Override - public void handleLinks(final List<String> arg0) - throws QueryResultHandlerException { - // TODO Auto-generated method stub - - } - } - -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9c12630b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java new file mode 100644 index 0000000..7e410e0 --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIT.java @@ -0,0 +1,573 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.accumulo.MiniAccumuloClusterInstance; +import org.apache.rya.accumulo.MiniAccumuloSingleton; +import org.apache.rya.accumulo.RyaTestInstanceRule; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.rdftriplestore.RdfCloudTripleStore; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.zookeeper.ClientCnxn; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.NumericLiteralImpl; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.google.common.base.Optional; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +/** + * Performs integration test using {@link MiniAccumuloCluster} to ensure the + * functions of {@link PcjTables} work within a cluster setting. + */ +public class PcjTablesIT { + + private static final String USE_MOCK_INSTANCE = ".useMockInstance"; + private static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename"; + private static final String CLOUDBASE_USER = "sc.cloudbase.username"; + private static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password"; + + private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + + // The MiniAccumuloCluster is re-used between tests. + private MiniAccumuloClusterInstance cluster = MiniAccumuloSingleton.getInstance(); + + // Rya data store and connections. + protected RyaSailRepository ryaRepo = null; + protected RepositoryConnection ryaConn = null; + + @Rule + public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false); + + @BeforeClass + public static void killLoudLogs() { + Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); + } + + @Before + public void resetTestEnvironmanet() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, RepositoryException, IOException, InterruptedException { + // Setup the Rya library to use the Mini Accumulo. + ryaRepo = setupRya(); + ryaConn = ryaRepo.getConnection(); + } + + @After + public void shutdownMiniCluster() throws IOException, InterruptedException, RepositoryException { + // Stop Rya. + if (ryaRepo != null) { + ryaRepo.shutDown(); + } + } + + private String getRyaInstanceName() { + return testInstance.getRyaInstanceName(); + } + + /** + * Format a Mini Accumulo to be a Rya repository. + * + * @return The Rya repository sitting on top of the Mini Accumulo. + */ + private RyaSailRepository setupRya() throws AccumuloException, AccumuloSecurityException, RepositoryException { + // Setup the Rya Repository that will be used to create Repository Connections. + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); + crdfdao.setConnector( cluster.getConnector() ); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(getRyaInstanceName()); + conf.setDisplayQueryPlan(true); + + conf.setBoolean(USE_MOCK_INSTANCE, false); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, getRyaInstanceName()); + conf.set(CLOUDBASE_USER, cluster.getUsername()); + conf.set(CLOUDBASE_PASSWORD, cluster.getPassword()); + conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName()); + + crdfdao.setConf(conf); + ryaStore.setRyaDAO(crdfdao); + + final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); + ryaRepo.initialize(); + + return ryaRepo; + } + + /** + * Ensure that when a new PCJ table is created, it is initialized with the + * correct metadata values. + * <p> + * The method being tested is {@link PcjTables#createPcjTable(Connector, String, Set, String)} + */ + @Test + public void createPcjTable() throws PcjException, AccumuloException, AccumuloSecurityException { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final Connector accumuloConn = cluster.getConnector(); + + // Create a PCJ table in the Mini Accumulo. + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); + final PcjTables pcjs = new PcjTables(); + pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Fetch the PcjMetadata and ensure it has the correct values. + final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); + + // Ensure the metadata matches the expected value. + final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders); + assertEquals(expected, pcjMetadata); + } + + /** + * Ensure when results have been written to the PCJ table that they are in Accumulo. + * <p> + * The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)} + */ + @Test + public void addResults() throws PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final Connector accumuloConn = cluster.getConnector(); + + // Create a PCJ table in the Mini Accumulo. + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); + final PcjTables pcjs = new PcjTables(); + pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Add a few results to the PCJ table. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie); + pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet( + new VisibilityBindingSet(alice), + new VisibilityBindingSet(bob), + new VisibilityBindingSet(charlie))); + + // Make sure the cardinality was updated. + final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); + assertEquals(3, metadata.getCardinality()); + + // Scan Accumulo for the stored results. + final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName); + + // Ensure the expected results match those that were stored. + final Multimap<String, BindingSet> expectedResults = HashMultimap.create(); + expectedResults.putAll("name;age", results); + expectedResults.putAll("age;name", results); + assertEquals(expectedResults, fetchedResults); + } + + @Test + public void listResults() throws Exception { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final Connector accumuloConn = cluster.getConnector(); + + // Create a PCJ table in the Mini Accumulo. + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); + final PcjTables pcjs = new PcjTables(); + pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Add a few results to the PCJ table. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet( + new VisibilityBindingSet(alice), + new VisibilityBindingSet(bob), + new VisibilityBindingSet(charlie))); + + // Fetch the Binding Sets that have been stored in the PCJ table. + final Set<BindingSet> results = new HashSet<>(); + + final CloseableIterator<BindingSet> resultsIt = pcjs.listResults(accumuloConn, pcjTableName, new Authorizations()); + try { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } finally { + resultsIt.close(); + } + + // Verify the fetched results match the expected ones. + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie); + assertEquals(expected, results); + } + + /** + * Ensure when results are already stored in Rya, that we are able to populate + * the PCJ table for a new SPARQL query using those results. + * <p> + * The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection, String)} + */ + @Test + public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table that will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final Connector accumuloConn = cluster.getConnector(); + + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); + final PcjTables pcjs = new PcjTables(); + pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Populate the PCJ table using a Rya connection. + pcjs.populatePcj(accumuloConn, pcjTableName, ryaConn); + + // Scan Accumulo for the stored results. + final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName); + + // Make sure the cardinality was updated. + final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); + assertEquals(3, metadata.getCardinality()); + + // Ensure the expected results match those that were stored. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie); + + final Multimap<String, BindingSet> expectedResults = HashMultimap.create(); + expectedResults.putAll("name;age", results); + expectedResults.putAll("age;name", results); + assertEquals(expectedResults, fetchedResults); + } + + /** + * Ensure the method that creates a new PCJ table, scans Rya for matches, and + * stores them in the PCJ table works. + * <p> + * The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)} + */ + @Test + public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table that will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final Connector accumuloConn = cluster.getConnector(); + + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); + + // Create and populate the PCJ table. + final PcjTables pcjs = new PcjTables(); + pcjs.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); + + // Make sure the cardinality was updated. + final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); + assertEquals(3, metadata.getCardinality()); + + // Scan Accumulo for the stored results. + final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName); + + // Ensure the expected results match those that were stored. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie); + + final Multimap<String, BindingSet> expectedResults = HashMultimap.create(); + expectedResults.putAll("name;age", results); + expectedResults.putAll("age;name", results); + + assertEquals(expectedResults, fetchedResults); + } + + @Test + public void listPcjs() throws PCJStorageException, AccumuloException, AccumuloSecurityException { + final Connector accumuloConn = cluster.getConnector(); + + // Set up the table names that will be used. + final String instance1 = "instance1_"; + final String instance2 = "instance2_"; + + final String instance1_table1 = new PcjTableNameFactory().makeTableName(instance1, "table1"); + final String instance1_table2 = new PcjTableNameFactory().makeTableName(instance1, "table2"); + final String instance1_table3 = new PcjTableNameFactory().makeTableName(instance1, "table3"); + + final String instance2_table1 = new PcjTableNameFactory().makeTableName(instance2, "table1"); + + // Create the PCJ Tables that are in instance 1 and instance 2. + final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); + final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>"; + + final PcjTables pcjs = new PcjTables(); + pcjs.createPcjTable(accumuloConn, instance1_table1, varOrders, sparql); + pcjs.createPcjTable(accumuloConn, instance1_table2, varOrders, sparql); + pcjs.createPcjTable(accumuloConn, instance1_table3, varOrders, sparql); + + pcjs.createPcjTable(accumuloConn, instance2_table1, varOrders, sparql); + + // Ensure all of the names have been stored for instance 1 and 2. + final Set<String> expected1 = Sets.newHashSet(instance1_table1, instance1_table2, instance1_table3); + final Set<String> instance1Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance1) ); + assertEquals(expected1, instance1Tables); + + final Set<String> expected2 = Sets.newHashSet(instance2_table1); + final Set<String> instance2Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance2) ); + assertEquals(expected2, instance2Tables); + } + + @Test + public void purge() throws PCJStorageException, AccumuloException, AccumuloSecurityException { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final Connector accumuloConn = cluster.getConnector(); + + // Create a PCJ table in the Mini Accumulo. + final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); + final PcjTables pcjs = new PcjTables(); + pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Add a few results to the PCJ table. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet( + new VisibilityBindingSet(alice), + new VisibilityBindingSet(bob), + new VisibilityBindingSet(charlie))); + + // Make sure the cardinality was updated. + PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); + assertEquals(3, metadata.getCardinality()); + + // Purge the data. + pcjs.purgePcjTable(accumuloConn, pcjTableName); + + // Make sure the cardinality was updated to 0. + metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); + assertEquals(0, metadata.getCardinality()); + } + + @Test + public void dropPcj() throws PCJStorageException, AccumuloException, AccumuloSecurityException { + final Connector accumuloConn = cluster.getConnector(); + + // Create a PCJ index. + final String tableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "thePcj"); + final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); + final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>"; + + final PcjTables pcjs = new PcjTables(); + pcjs.createPcjTable(accumuloConn, tableName, varOrders, sparql); + + // Fetch its metadata to show that it has actually been created. + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); + PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, tableName); + assertEquals(expectedMetadata, metadata); + + // Drop it. + pcjs.dropPcjTable(accumuloConn, tableName); + + // Show the metadata is no longer present. + PCJStorageException tableDoesNotExistException = null; + try { + metadata = pcjs.getPcjMetadata(accumuloConn, tableName); + } catch(final PCJStorageException e) { + tableDoesNotExistException = e; + } + assertNotNull(tableDoesNotExistException); + } + + /** + * Scan accumulo for the results that are stored in a PCJ table. The + * multimap stores a set of deserialized binding sets that were in the PCJ + * table for every variable order that is found in the PCJ metadata. + */ + private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException { + final Multimap<String, BindingSet> fetchedResults = HashMultimap.create(); + + // Get the variable orders the data was written to. + final PcjTables pcjs = new PcjTables(); + final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); + + // Scan Accumulo for the stored results. + for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) { + final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); + scanner.fetchColumnFamily( new Text(varOrder.toString()) ); + + for(final Entry<Key, Value> entry : scanner) { + final byte[] serializedResult = entry.getKey().getRow().getBytes(); + final BindingSet result = converter.convert(serializedResult, varOrder); + fetchedResults.put(varOrder.toString(), result); + } + } + + return fetchedResults; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9c12630b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java deleted file mode 100644 index e43ab83..0000000 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java +++ /dev/null @@ -1,573 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.storage.accumulo; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.minicluster.MiniAccumuloCluster; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.AccumuloRyaDAO; -import org.apache.rya.accumulo.MiniAccumuloClusterInstance; -import org.apache.rya.accumulo.MiniAccumuloSingleton; -import org.apache.rya.accumulo.RyaTestInstanceRule; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.rdftriplestore.RdfCloudTripleStore; -import org.apache.rya.rdftriplestore.RyaSailRepository; -import org.apache.zookeeper.ClientCnxn; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.openrdf.model.Statement; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.NumericLiteralImpl; -import org.openrdf.model.impl.StatementImpl; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.BindingSet; -import org.openrdf.query.impl.MapBindingSet; -import org.openrdf.repository.RepositoryConnection; -import org.openrdf.repository.RepositoryException; - -import com.google.common.base.Optional; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - -/** - * Performs integration test using {@link MiniAccumuloCluster} to ensure the - * functions of {@link PcjTables} work within a cluster setting. - */ -public class PcjTablesIntegrationTest { - - private static final String USE_MOCK_INSTANCE = ".useMockInstance"; - private static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename"; - private static final String CLOUDBASE_USER = "sc.cloudbase.username"; - private static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password"; - - private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - - // The MiniAccumuloCluster is re-used between tests. - private MiniAccumuloClusterInstance cluster = MiniAccumuloSingleton.getInstance(); - - // Rya data store and connections. - protected RyaSailRepository ryaRepo = null; - protected RepositoryConnection ryaConn = null; - - @Rule - public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false); - - @BeforeClass - public static void killLoudLogs() { - Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); - } - - @Before - public void resetTestEnvironmanet() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, RepositoryException, IOException, InterruptedException { - // Setup the Rya library to use the Mini Accumulo. - ryaRepo = setupRya(); - ryaConn = ryaRepo.getConnection(); - } - - @After - public void shutdownMiniCluster() throws IOException, InterruptedException, RepositoryException { - // Stop Rya. - if (ryaRepo != null) { - ryaRepo.shutDown(); - } - } - - private String getRyaInstanceName() { - return testInstance.getRyaInstanceName(); - } - - /** - * Format a Mini Accumulo to be a Rya repository. - * - * @return The Rya repository sitting on top of the Mini Accumulo. - */ - private RyaSailRepository setupRya() throws AccumuloException, AccumuloSecurityException, RepositoryException { - // Setup the Rya Repository that will be used to create Repository Connections. - final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); - final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); - crdfdao.setConnector( cluster.getConnector() ); - - // Setup Rya configuration values. - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(getRyaInstanceName()); - conf.setDisplayQueryPlan(true); - - conf.setBoolean(USE_MOCK_INSTANCE, false); - conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, getRyaInstanceName()); - conf.set(CLOUDBASE_USER, cluster.getUsername()); - conf.set(CLOUDBASE_PASSWORD, cluster.getPassword()); - conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName()); - - crdfdao.setConf(conf); - ryaStore.setRyaDAO(crdfdao); - - final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); - ryaRepo.initialize(); - - return ryaRepo; - } - - /** - * Ensure that when a new PCJ table is created, it is initialized with the - * correct metadata values. - * <p> - * The method being tested is {@link PcjTables#createPcjTable(Connector, String, Set, String)} - */ - @Test - public void createPcjTable() throws PcjException, AccumuloException, AccumuloSecurityException { - final String sparql = - "SELECT ?name ?age " + - "{" + - "FILTER(?age < 30) ." + - "?name <http://hasAge> ?age." + - "?name <http://playsSport> \"Soccer\" " + - "}"; - - final Connector accumuloConn = cluster.getConnector(); - - // Create a PCJ table in the Mini Accumulo. - final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); - final PcjTables pcjs = new PcjTables(); - pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); - - // Fetch the PcjMetadata and ensure it has the correct values. - final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - - // Ensure the metadata matches the expected value. - final PcjMetadata expected = new PcjMetadata(sparql, 0L, varOrders); - assertEquals(expected, pcjMetadata); - } - - /** - * Ensure when results have been written to the PCJ table that they are in Accumulo. - * <p> - * The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)} - */ - @Test - public void addResults() throws PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { - final String sparql = - "SELECT ?name ?age " + - "{" + - "FILTER(?age < 30) ." + - "?name <http://hasAge> ?age." + - "?name <http://playsSport> \"Soccer\" " + - "}"; - - final Connector accumuloConn = cluster.getConnector(); - - // Create a PCJ table in the Mini Accumulo. - final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); - final PcjTables pcjs = new PcjTables(); - pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); - - // Add a few results to the PCJ table. - final MapBindingSet alice = new MapBindingSet(); - alice.addBinding("name", new URIImpl("http://Alice")); - alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); - - final MapBindingSet bob = new MapBindingSet(); - bob.addBinding("name", new URIImpl("http://Bob")); - bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); - - final MapBindingSet charlie = new MapBindingSet(); - charlie.addBinding("name", new URIImpl("http://Charlie")); - charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); - - final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie); - pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet( - new VisibilityBindingSet(alice), - new VisibilityBindingSet(bob), - new VisibilityBindingSet(charlie))); - - // Make sure the cardinality was updated. - final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - assertEquals(3, metadata.getCardinality()); - - // Scan Accumulo for the stored results. - final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName); - - // Ensure the expected results match those that were stored. - final Multimap<String, BindingSet> expectedResults = HashMultimap.create(); - expectedResults.putAll("name;age", results); - expectedResults.putAll("age;name", results); - assertEquals(expectedResults, fetchedResults); - } - - @Test - public void listResults() throws Exception { - final String sparql = - "SELECT ?name ?age " + - "{" + - "FILTER(?age < 30) ." + - "?name <http://hasAge> ?age." + - "?name <http://playsSport> \"Soccer\" " + - "}"; - - final Connector accumuloConn = cluster.getConnector(); - - // Create a PCJ table in the Mini Accumulo. - final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); - final PcjTables pcjs = new PcjTables(); - pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); - - // Add a few results to the PCJ table. - final MapBindingSet alice = new MapBindingSet(); - alice.addBinding("name", new URIImpl("http://Alice")); - alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); - - final MapBindingSet bob = new MapBindingSet(); - bob.addBinding("name", new URIImpl("http://Bob")); - bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); - - final MapBindingSet charlie = new MapBindingSet(); - charlie.addBinding("name", new URIImpl("http://Charlie")); - charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); - - pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet( - new VisibilityBindingSet(alice), - new VisibilityBindingSet(bob), - new VisibilityBindingSet(charlie))); - - // Fetch the Binding Sets that have been stored in the PCJ table. - final Set<BindingSet> results = new HashSet<>(); - - final CloseableIterator<BindingSet> resultsIt = pcjs.listResults(accumuloConn, pcjTableName, new Authorizations()); - try { - while(resultsIt.hasNext()) { - results.add( resultsIt.next() ); - } - } finally { - resultsIt.close(); - } - - // Verify the fetched results match the expected ones. - final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie); - assertEquals(expected, results); - } - - /** - * Ensure when results are already stored in Rya, that we are able to populate - * the PCJ table for a new SPARQL query using those results. - * <p> - * The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection, String)} - */ - @Test - public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { - // Load some Triples into Rya. - final Set<Statement> triples = new HashSet<>(); - triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); - triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); - triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); - triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); - triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); - triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); - triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); - triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); - - for(final Statement triple : triples) { - ryaConn.add(triple); - } - - // Create a PCJ table that will include those triples in its results. - final String sparql = - "SELECT ?name ?age " + - "{" + - "FILTER(?age < 30) ." + - "?name <http://hasAge> ?age." + - "?name <http://playsSport> \"Soccer\" " + - "}"; - - final Connector accumuloConn = cluster.getConnector(); - - final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); - final PcjTables pcjs = new PcjTables(); - pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); - - // Populate the PCJ table using a Rya connection. - pcjs.populatePcj(accumuloConn, pcjTableName, ryaConn); - - // Scan Accumulo for the stored results. - final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName); - - // Make sure the cardinality was updated. - final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - assertEquals(3, metadata.getCardinality()); - - // Ensure the expected results match those that were stored. - final MapBindingSet alice = new MapBindingSet(); - alice.addBinding("name", new URIImpl("http://Alice")); - alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); - - final MapBindingSet bob = new MapBindingSet(); - bob.addBinding("name", new URIImpl("http://Bob")); - bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); - - final MapBindingSet charlie = new MapBindingSet(); - charlie.addBinding("name", new URIImpl("http://Charlie")); - charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); - - final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie); - - final Multimap<String, BindingSet> expectedResults = HashMultimap.create(); - expectedResults.putAll("name;age", results); - expectedResults.putAll("age;name", results); - assertEquals(expectedResults, fetchedResults); - } - - /** - * Ensure the method that creates a new PCJ table, scans Rya for matches, and - * stores them in the PCJ table works. - * <p> - * The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)} - */ - @Test - public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { - // Load some Triples into Rya. - final Set<Statement> triples = new HashSet<>(); - triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); - triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); - triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); - triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); - triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); - triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); - triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); - triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); - - for(final Statement triple : triples) { - ryaConn.add(triple); - } - - // Create a PCJ table that will include those triples in its results. - final String sparql = - "SELECT ?name ?age " + - "{" + - "FILTER(?age < 30) ." + - "?name <http://hasAge> ?age." + - "?name <http://playsSport> \"Soccer\" " + - "}"; - - final Connector accumuloConn = cluster.getConnector(); - - final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); - - // Create and populate the PCJ table. - final PcjTables pcjs = new PcjTables(); - pcjs.createAndPopulatePcj(ryaConn, accumuloConn, pcjTableName, sparql, new String[]{"name", "age"}, Optional.<PcjVarOrderFactory>absent()); - - // Make sure the cardinality was updated. - final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - assertEquals(3, metadata.getCardinality()); - - // Scan Accumulo for the stored results. - final Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName); - - // Ensure the expected results match those that were stored. - final MapBindingSet alice = new MapBindingSet(); - alice.addBinding("name", new URIImpl("http://Alice")); - alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); - - final MapBindingSet bob = new MapBindingSet(); - bob.addBinding("name", new URIImpl("http://Bob")); - bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); - - final MapBindingSet charlie = new MapBindingSet(); - charlie.addBinding("name", new URIImpl("http://Charlie")); - charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); - - final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie); - - final Multimap<String, BindingSet> expectedResults = HashMultimap.create(); - expectedResults.putAll("name;age", results); - expectedResults.putAll("age;name", results); - - assertEquals(expectedResults, fetchedResults); - } - - @Test - public void listPcjs() throws PCJStorageException, AccumuloException, AccumuloSecurityException { - final Connector accumuloConn = cluster.getConnector(); - - // Set up the table names that will be used. - final String instance1 = "instance1_"; - final String instance2 = "instance2_"; - - final String instance1_table1 = new PcjTableNameFactory().makeTableName(instance1, "table1"); - final String instance1_table2 = new PcjTableNameFactory().makeTableName(instance1, "table2"); - final String instance1_table3 = new PcjTableNameFactory().makeTableName(instance1, "table3"); - - final String instance2_table1 = new PcjTableNameFactory().makeTableName(instance2, "table1"); - - // Create the PCJ Tables that are in instance 1 and instance 2. - final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); - final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>"; - - final PcjTables pcjs = new PcjTables(); - pcjs.createPcjTable(accumuloConn, instance1_table1, varOrders, sparql); - pcjs.createPcjTable(accumuloConn, instance1_table2, varOrders, sparql); - pcjs.createPcjTable(accumuloConn, instance1_table3, varOrders, sparql); - - pcjs.createPcjTable(accumuloConn, instance2_table1, varOrders, sparql); - - // Ensure all of the names have been stored for instance 1 and 2. - final Set<String> expected1 = Sets.newHashSet(instance1_table1, instance1_table2, instance1_table3); - final Set<String> instance1Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance1) ); - assertEquals(expected1, instance1Tables); - - final Set<String> expected2 = Sets.newHashSet(instance2_table1); - final Set<String> instance2Tables = Sets.newHashSet( pcjs.listPcjTables(accumuloConn, instance2) ); - assertEquals(expected2, instance2Tables); - } - - @Test - public void purge() throws PCJStorageException, AccumuloException, AccumuloSecurityException { - final String sparql = - "SELECT ?name ?age " + - "{" + - "FILTER(?age < 30) ." + - "?name <http://hasAge> ?age." + - "?name <http://playsSport> \"Soccer\" " + - "}"; - - final Connector accumuloConn = cluster.getConnector(); - - // Create a PCJ table in the Mini Accumulo. - final String pcjTableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "testPcj"); - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); - final PcjTables pcjs = new PcjTables(); - pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); - - // Add a few results to the PCJ table. - final MapBindingSet alice = new MapBindingSet(); - alice.addBinding("name", new URIImpl("http://Alice")); - alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); - - final MapBindingSet bob = new MapBindingSet(); - bob.addBinding("name", new URIImpl("http://Bob")); - bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); - - final MapBindingSet charlie = new MapBindingSet(); - charlie.addBinding("name", new URIImpl("http://Charlie")); - charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); - - pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet( - new VisibilityBindingSet(alice), - new VisibilityBindingSet(bob), - new VisibilityBindingSet(charlie))); - - // Make sure the cardinality was updated. - PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - assertEquals(3, metadata.getCardinality()); - - // Purge the data. - pcjs.purgePcjTable(accumuloConn, pcjTableName); - - // Make sure the cardinality was updated to 0. - metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - assertEquals(0, metadata.getCardinality()); - } - - @Test - public void dropPcj() throws PCJStorageException, AccumuloException, AccumuloSecurityException { - final Connector accumuloConn = cluster.getConnector(); - - // Create a PCJ index. - final String tableName = new PcjTableNameFactory().makeTableName(getRyaInstanceName(), "thePcj"); - final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); - final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>"; - - final PcjTables pcjs = new PcjTables(); - pcjs.createPcjTable(accumuloConn, tableName, varOrders, sparql); - - // Fetch its metadata to show that it has actually been created. - final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); - PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, tableName); - assertEquals(expectedMetadata, metadata); - - // Drop it. - pcjs.dropPcjTable(accumuloConn, tableName); - - // Show the metadata is no longer present. - PCJStorageException tableDoesNotExistException = null; - try { - metadata = pcjs.getPcjMetadata(accumuloConn, tableName); - } catch(final PCJStorageException e) { - tableDoesNotExistException = e; - } - assertNotNull(tableDoesNotExistException); - } - - /** - * Scan accumulo for the results that are stored in a PCJ table. The - * multimap stores a set of deserialized binding sets that were in the PCJ - * table for every variable order that is found in the PCJ metadata. - */ - private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException { - final Multimap<String, BindingSet> fetchedResults = HashMultimap.create(); - - // Get the variable orders the data was written to. - final PcjTables pcjs = new PcjTables(); - final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - - // Scan Accumulo for the stored results. - for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) { - final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); - scanner.fetchColumnFamily( new Text(varOrder.toString()) ); - - for(final Entry<Key, Value> entry : scanner) { - final byte[] serializedResult = entry.getKey().getRow().getBytes(); - final BindingSet result = converter.convert(serializedResult, varOrder); - fetchedResults.put(varOrder.toString(), result); - } - } - - return fetchedResults; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/9c12630b/osgi/camel.rya/src/test/java/org/apache/rya/camel/cbsail/CbSailIntegrationTest.java ---------------------------------------------------------------------- diff --git a/osgi/camel.rya/src/test/java/org/apache/rya/camel/cbsail/CbSailIntegrationTest.java b/osgi/camel.rya/src/test/java/org/apache/rya/camel/cbsail/CbSailIntegrationTest.java deleted file mode 100644 index 4928886..0000000 --- a/osgi/camel.rya/src/test/java/org/apache/rya/camel/cbsail/CbSailIntegrationTest.java +++ /dev/null @@ -1,117 +0,0 @@ -package org.apache.rya.camel.cbsail; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - - -import org.apache.rya.camel.cbsail.CbSailComponent; -import org.apache.camel.EndpointInject; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.ProducerTemplate; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.test.CamelTestSupport; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; - -import java.util.HashMap; - -public class CbSailIntegrationTest extends CamelTestSupport { - - @EndpointInject(uri = "cbsail:tquery?server=stratus13&port=2181&user=root&pwd=password&instanceName=stratus") - ProducerTemplate producer; - - public void testCbSail() throws Exception { - String underGradInfo = "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>" + - " PREFIX ub: <urn:test:onto:univ#>" + - " SELECT * WHERE" + - " {" + - " <http://www.Department0.University0.edu/UndergraduateStudent600> ?pred ?obj ." + - " }"; - HashMap map = new HashMap(); - map.put(CbSailComponent.SPARQL_QUERY_PROP, underGradInfo); - map.put(CbSailComponent.START_TIME_QUERY_PROP, 0l); - map.put(CbSailComponent.TTL_QUERY_PROP, 86400000l); - Object o = producer.requestBodyAndHeaders(null, map); - System.out.println(o); - Thread.sleep(100000); - } - - @Override - protected RouteBuilder createRouteBuilder() { - return new RouteBuilder() { - - @Override - public void configure() throws Exception { - ValueFactory vf = new ValueFactoryImpl(); - String underGradInfo = "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>" + - " PREFIX ub: <urn:test:onto:univ#>" + - " SELECT * WHERE" + - " {" + - " <http://www.Department0.University0.edu/UndergraduateStudent60> ?pred ?obj ." + - " }"; - String rawEvents = "PREFIX nh: <http://org.apache.com/2011/02/nh#>\n" + - " SELECT * WHERE\n" + - " {\n" + - " ?uuid nh:timestamp ?timestamp.\n" + - " ?uuid nh:site ?site;\n" + - " nh:system ?system;\n" + - " nh:dataSupplier ?dataSupplier;\n" + - " nh:dataType ?dataType;\n" + - " <http://org.apache.com/2011/02/nh#count> ?data.\n" + - " } LIMIT 100"; - String latestModels = "PREFIX nh: <http://org.apache.com/rdf/2011/02/model#>" + - " PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>" + - " SELECT * WHERE" + - " {" + - " ?modelUuid nh:dayOfWeek \"5\";" + - " nh:hourOfDay \"3\";" + - " nh:timestamp ?timestamp;" + -// " FILTER (xsd:integer(?timestamp) > 1297652964633)." + - " nh:dataProperty \"count\";" + - " nh:modelType \"org.apache.learning.tpami.SimpleGaussianMMModel\";" + - " nh:site ?site;" + - " nh:dataSupplier ?dataSupplier;" + - " nh:system ?system;" + - " nh:dataType ?dataType;" + - " nh:model ?model;" + - " nh:key ?key." + - " }"; - - from("timer://foo?fixedRate=true&period=60000"). - setHeader(CbSailComponent.SPARQL_QUERY_PROP, constant(underGradInfo)). -// setBody(constant(new StatementImpl(vf.createURI("http://www.Department0.University0.edu/UndergraduateStudent610"), vf.createURI("urn:test:onto:univ#testPred"), vf.createLiteral("test")))). - to("cbsail:tquery?server=stratus13&port=2181&user=root&pwd=password&instanceName=stratus&queryOutput=XML" + -// "&ttl=259200000" -// + "&sparql=" + latestModels" + - "").process(new Processor() { - - @Override - public void process(Exchange exchange) throws Exception { - System.out.println(exchange.getIn().getBody()); -// if (body != null) -// System.out.println(body.size()); - } - }).end(); - } - }; - } - -}
