http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java new file mode 100644 index 0000000..0a1852b --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.BindingImpl; + +import com.google.common.collect.Sets; + +import io.fluo.api.client.FluoClient; +import io.fluo.api.client.Snapshot; +import io.fluo.api.config.ScannerConfiguration; +import io.fluo.api.data.Bytes; +import io.fluo.api.data.Span; +import io.fluo.api.iterator.ColumnIterator; +import io.fluo.api.iterator.RowIterator; + +public class CreateDeleteIT extends ITBase { + + /** + * Ensure historic matches are included in the result. + */ + @Test + public void historicResults() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = "SELECT ?x " + "WHERE { " + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + "}"; + + // Triples that are loaded into Rya before the PCJ is created. + final Set<Statement> historicTriples = Sets.newHashSet( + makeStatement("http://Alice", "http://talksTo", "http://Eve"), + makeStatement("http://Bob", "http://talksTo", "http://Eve"), + makeStatement("http://Charlie", "http://talksTo", "http://Eve"), + + makeStatement("http://Eve", "http://helps", "http://Kevin"), + + makeStatement("http://Bob", "http://worksAt", "http://Chipotle"), + makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"), + makeStatement("http://Eve", "http://worksAt", "http://Chipotle"), + makeStatement("http://David", "http://worksAt", "http://Chipotle")); + + // The expected results of the SPARQL query once the PCJ has been + // computed. + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob")))); + expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie")))); + + // Load the historic data into Rya. + for (final Statement triple : historicTriples) { + ryaConn.add(triple); + } + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + + // Verify the end results of the query match the expected results. + fluo.waitForObservers(); + + final Set<BindingSet> results = getQueryBindingSetValues(fluoClient, sparql); + assertEquals(expected, results); + + List<Bytes> rows = getFluoTableEntries(fluoClient); + assertEquals(17, rows.size()); + + // Delete the PCJ from the Fluo application. + new DeletePcj(1).deletePcj(fluoClient, pcjId); + + // Ensure all data related to the query has been removed. + List<Bytes> empty_rows = getFluoTableEntries(fluoClient); + assertEquals(0, empty_rows.size()); + } + + private List<Bytes> getFluoTableEntries(FluoClient fluoClient) { + try (Snapshot snapshot = fluoClient.newSnapshot()) { + List<Bytes> rows = new ArrayList<>(); + + ScannerConfiguration sc1 = new ScannerConfiguration(); + sc1.setSpan(Span.prefix("")); + RowIterator iterator = snapshot.get(sc1); + + while (iterator.hasNext()) { + Entry<Bytes, ColumnIterator> row = iterator.next(); + rows.add(row.getKey()); + } + return rows; + } + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java index 2381346..ebd2759 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -27,7 +27,8 @@ import java.util.Set; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.impl.URIImpl; @@ -84,8 +85,12 @@ public class InputIT extends ITBase { ryaConn.add(triple); } - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Verify the end results of the query match the expected results. fluo.waitForObservers(); @@ -127,8 +132,12 @@ public class InputIT extends ITBase { expected.add(makeBindingSet( new BindingImpl("x", new URIImpl("http://Charlie")))); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Ensure the query has no results yet. fluo.waitForObservers(); @@ -175,8 +184,12 @@ public class InputIT extends ITBase { ryaConn.add(triple); } - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Ensure Alice is a match. fluo.waitForObservers(); @@ -236,8 +249,12 @@ public class InputIT extends ITBase { ryaConn.add(triple); } - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Ensure Alice is a match. fluo.waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index e83e977..4dba84b 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -26,7 +26,8 @@ import java.util.Set; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; import org.openrdf.model.impl.NumericLiteralImpl; import org.openrdf.model.impl.URIImpl; @@ -78,8 +79,12 @@ public class QueryIT extends ITBase { expected.add( makeBindingSet( new BindingImpl("person", new URIImpl("http://Charlie")))); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -160,8 +165,12 @@ public class QueryIT extends ITBase { new BindingImpl("candidate", new URIImpl("http://Ivan")), new BindingImpl("leader", new URIImpl("http://Bob")))); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -221,8 +230,12 @@ public class QueryIT extends ITBase { new BindingImpl("worker", new URIImpl("http://David")), new BindingImpl("city", new URIImpl("http://London")))); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -265,8 +278,12 @@ public class QueryIT extends ITBase { new BindingImpl("name", new URIImpl("http://Charlie")), new BindingImpl("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)))); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index 873b78e..2ea2202 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -19,45 +19,28 @@ package org.apache.rya.indexing.pcj.fluo.integration; import static org.junit.Assert.assertEquals; -import io.fluo.api.client.Snapshot; -import io.fluo.api.data.Bytes; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import mvm.rya.api.domain.RyaStatement; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.io.Text; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.junit.Test; import org.openrdf.model.impl.URIImpl; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.BindingImpl; import com.google.common.base.Optional; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import mvm.rya.api.domain.RyaStatement; + /** * Performs integration tests over the Fluo application geared towards Rya PCJ exporting. * <p> @@ -65,8 +48,6 @@ import com.google.common.collect.Sets; */ public class RyaExportIT extends ITBase { - private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); - /** * Configure the export observer to use the Mini Accumulo instance as the * export destination for new PCJ results. @@ -81,7 +62,7 @@ public class RyaExportIT extends ITBase { ryaParams.setZookeeperServers(zookeepers); ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); - + ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); return params; } @@ -120,22 +101,26 @@ public class RyaExportIT extends ITBase { makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle")); // The expected results of the SPARQL query once the PCJ has been computed. - final Set<BindingSet> expectedResults = new HashSet<>(); - expectedResults.add( makeBindingSet( + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet( new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Bob")), new BindingImpl("city", new URIImpl("http://London")))); - expectedResults.add( makeBindingSet( + expected.add(makeBindingSet( new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Charlie")), new BindingImpl("city", new URIImpl("http://London")))); - expectedResults.add( makeBindingSet( + expected.add(makeBindingSet( new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://David")), new BindingImpl("city", new URIImpl("http://London")))); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -143,48 +128,10 @@ public class RyaExportIT extends ITBase { // Fetch the exported results from Accumulo once the observers finish working. fluo.waitForObservers(); - // Fetch expcted results from the PCJ table that is in Accumulo. - final String exportTableName; - try(Snapshot snapshot = fluoClient.newSnapshot()) { - final Bytes queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID); - exportTableName = snapshot.get(queryId, FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString(); - } - - final Multimap<String, BindingSet> results = loadPcjResults(accumuloConn, exportTableName); + // Fetch expected results from the PCJ table that is in Accumulo. + final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); // Verify the end results of the query match the expected results. - final Multimap<String, BindingSet> expected = HashMultimap.create(); - expected.putAll("customer;worker;city", expectedResults); - expected.putAll("worker;city;customer", expectedResults); - expected.putAll("city;customer;worker", expectedResults); - - assertEquals(expected, results); - } - - /** - * Scan accumulo for the results that are stored in a PCJ table. The - * multimap stores a set of deserialized binding sets that were in the PCJ - * table for every variable order that is found in the PCJ metadata. - */ - private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException { - final Multimap<String, BindingSet> fetchedResults = HashMultimap.create(); - - // Get the variable orders the data was written to. - final PcjTables pcjs = new PcjTables(); - final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - - // Scan Accumulo for the stored results. - for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) { - final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); - scanner.fetchColumnFamily( new Text(varOrder.toString()) ); - - for(final Entry<Key, Value> entry : scanner) { - final byte[] serializedResult = entry.getKey().getRow().getBytes(); - final BindingSet result = converter.convert(serializedResult, varOrder); - fetchedResults.put(varOrder.toString(), result); - } - } - - return fetchedResults; + assertEquals(expected, results); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java index 68fd842..8cc7404 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java @@ -19,17 +19,14 @@ package org.apache.rya.indexing.pcj.fluo.integration; import static org.junit.Assert.assertEquals; -import io.fluo.api.client.FluoClient; import java.util.HashSet; import java.util.Set; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.indexing.external.PrecomputedJoinIndexer; - import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; import org.junit.Test; import org.openrdf.model.Statement; @@ -40,6 +37,10 @@ import org.openrdf.repository.RepositoryConnection; import com.google.common.collect.Sets; +import io.fluo.api.client.FluoClient; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.indexing.external.PrecomputedJoinIndexer; + /** * This test ensures that the correct updates are pushed by Fluo @@ -84,9 +85,12 @@ public class RyaInputIncrementalUpdateIT extends ITBase { expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Bob")))); expected.add(makeBindingSet(new BindingImpl("x", new URIImpl("http://Charlie")))); - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, - new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); // Verify the end results of the query match the expected results. fluo.waitForObservers(); @@ -132,9 +136,13 @@ public class RyaInputIncrementalUpdateIT extends ITBase { ryaConn.add(triple); } - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, - new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + fluo.waitForObservers(); // Load the streaming data into Rya. @@ -177,9 +185,13 @@ public class RyaInputIncrementalUpdateIT extends ITBase { ryaConn.add(triple); } - // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, - new HashSet<VariableOrder>(), sparql); + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + fluo.waitForObservers(); // Load the streaming data into Rya. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index 95228b8..b23b4a4 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -20,62 +20,38 @@ package org.apache.rya.indexing.pcj.fluo.visibility; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import io.fluo.api.client.Snapshot; -import io.fluo.api.data.Bytes; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTypeResolverException; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; -import org.apache.hadoop.io.Text; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.junit.Test; -import org.openrdf.model.impl.NumericLiteralImpl; import org.openrdf.model.impl.URIImpl; -import org.openrdf.model.vocabulary.XMLSchema; import org.openrdf.query.BindingSet; import org.openrdf.query.impl.BindingImpl; -import org.openrdf.query.impl.MapBindingSet; -import org.openrdf.repository.RepositoryException; +import com.beust.jcommander.internal.Sets; import com.google.common.base.Optional; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; + +import mvm.rya.api.domain.RyaStatement; public class PcjVisibilityIT extends ITBase { - private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); /** * Configure the export observer to use the Mini Accumulo instance as the * export destination for new PCJ results. @@ -90,80 +66,11 @@ public class PcjVisibilityIT extends ITBase { ryaParams.setZookeeperServers(zookeepers); ryaParams.setExporterUsername(ITBase.ACCUMULO_USER); ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD); - + ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME); return params; } @Test - public void createWithVisibilityDirect() throws RepositoryException, PcjException, TableNotFoundException, RyaTypeResolverException, AccumuloException, AccumuloSecurityException, BindingSetConversionException { - // Create a PCJ table that will include those triples in its results. - final String sparql = - "SELECT ?name ?age " + - "{" + - "FILTER(?age < 30) ." + - "?name <http://hasAge> ?age." + - "?name <http://playsSport> \"Soccer\" " + - "}"; - - final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); - - // Create and populate the PCJ table. - final PcjTables pcjs = new PcjTables(); - final PcjVarOrderFactory varOrderFactory = Optional.<PcjVarOrderFactory>absent().or(new ShiftVarOrderFactory()); - final Set<VariableOrder> varOrders = varOrderFactory.makeVarOrders( new VariableOrder(new String[]{"name", "age"}) ); - - // Create the PCJ table in Accumulo. - pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); - setupTestUsers(pcjTableName); - - // Add a few results to the PCJ table. - final MapBindingSet alice = new MapBindingSet(); - alice.addBinding("name", new URIImpl("http://Alice")); - alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); - - final MapBindingSet bob = new MapBindingSet(); - bob.addBinding("name", new URIImpl("http://Bob")); - bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); - - final MapBindingSet charlie = new MapBindingSet(); - charlie.addBinding("name", new URIImpl("http://Charlie")); - charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); - - final VisibilityBindingSet aliceVisibility = new VisibilityBindingSet(alice, "A&B&C"); - final VisibilityBindingSet bobVisibility = new VisibilityBindingSet(bob, "B&C"); - final VisibilityBindingSet charlieVisibility = new VisibilityBindingSet(charlie, "C"); - - final Set<BindingSet> results = Sets.<BindingSet>newHashSet(alice, bob, charlie); - final Set<VisibilityBindingSet> visibilityResults = Sets.<VisibilityBindingSet>newHashSet(aliceVisibility, bobVisibility, charlieVisibility); - // Load historic matches from Rya into the PCJ table. - pcjs.addResults(accumuloConn, pcjTableName, visibilityResults); - - // Make sure the cardinality was updated. - final PcjMetadata metadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - assertEquals(3, metadata.getCardinality()); - - // Scan Accumulo for the stored results. - Multimap<String, BindingSet> fetchedResults = loadPcjResults(accumuloConn, pcjTableName, "A", "B", "C"); - assertEquals(getExpectedResults(results), fetchedResults); - fetchedResults = loadPcjResults(accumuloConn, pcjTableName, "C", "B"); - assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(bob, charlie)), fetchedResults); - fetchedResults = loadPcjResults(accumuloConn, pcjTableName, "C"); - assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), fetchedResults); - - final Connector cConn = accumuloConn.getInstance().getConnector("cUser", new PasswordToken("password")); - // Scan Accumulo for the stored results. - fetchedResults = loadPcjResults(cConn, pcjTableName, "C"); - assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), fetchedResults); - fetchedResults = loadPcjResults(cConn, pcjTableName); - assertEquals(getExpectedResults(Sets.<BindingSet>newHashSet(charlie)), fetchedResults); - - final Connector noAuthConn = accumuloConn.getInstance().getConnector("noAuth", new PasswordToken("password")); - // Scan Accumulo for the stored results. - fetchedResults = loadPcjResults(noAuthConn, pcjTableName); - assertTrue(fetchedResults.isEmpty()); - } - - @Test public void createWithVisibilityFluo() throws Exception { final String sparql = "SELECT ?customer ?worker ?city " + @@ -195,8 +102,12 @@ public class PcjVisibilityIT extends ITBase { addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Frank", "http://livesIn", "http://London"), ""); addStatementVisibilityEntry(streamedTriples, makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle"), ""); + // Create the PCJ Table in Accumulo. + final PrecomputedJoinStorage rootStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = rootStorage.createPcj(sparql); + // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(fluoClient, RYA_TABLE_PREFIX, ryaRepo, accumuloConn, new HashSet<VariableOrder>(), sparql); + new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, ryaRepo); // Stream the data into Fluo. for(final RyaStatement statement : streamedTriples.keySet()) { @@ -206,157 +117,121 @@ public class PcjVisibilityIT extends ITBase { // Fetch the exported results from Accumulo once the observers finish working. fluo.waitForObservers(); - // Fetch expected results from the PCJ table that is in Accumulo. - final String exportTableName; - try(Snapshot snapshot = fluoClient.newSnapshot()) { - final Bytes queryId = snapshot.get(Bytes.of(sparql), FluoQueryColumns.QUERY_ID); - exportTableName = snapshot.get(queryId, FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).toString(); - } - setupTestUsers(exportTableName); - Multimap<String, BindingSet> results = loadPcjResults(accumuloConn, exportTableName); + setupTestUsers(accumuloConn, RYA_INSTANCE_NAME, pcjId); + + // Verify ABCDE using root. + final Set<BindingSet> rootResults = toSet( rootStorage.listResults(pcjId)); - // Verify the end results of the query match the expected results. - Multimap<String, BindingSet> expected = makeExpected( - makeBindingSet( + final Set<BindingSet> rootExpected = Sets.newHashSet(); + rootExpected.add( makeBindingSet( new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Bob")), - new BindingImpl("city", new URIImpl("http://London"))), - makeBindingSet( + new BindingImpl("city", new URIImpl("http://London")))); + rootExpected.add( makeBindingSet( new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Charlie")), - new BindingImpl("city", new URIImpl("http://London"))), - makeBindingSet( + new BindingImpl("city", new URIImpl("http://London")))); + rootExpected.add( makeBindingSet( new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Eve")), - new BindingImpl("city", new URIImpl("http://Leeds"))), - makeBindingSet( + new BindingImpl("city", new URIImpl("http://Leeds")))); + rootExpected.add( makeBindingSet( new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://David")), new BindingImpl("city", new URIImpl("http://London")))); - assertEquals(expected, results); + assertEquals(rootExpected, rootResults); - final PasswordToken pass = new PasswordToken("password"); - Connector userConn = accumuloConn.getInstance().getConnector("abUser", pass); - results = loadPcjResults(userConn, exportTableName); - // Verify the end results of the query match the expected results. - expected = makeExpected( - makeBindingSet( + // Verify AB + final Connector abConn = cluster.getConnector("abUser", "password"); + final PrecomputedJoinStorage abStorage = new AccumuloPcjStorage(abConn, RYA_INSTANCE_NAME); + final Set<BindingSet> abResults = toSet( abStorage.listResults(pcjId) ); + + final Set<BindingSet> abExpected = Sets.newHashSet(); + abExpected.add( makeBindingSet( new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Bob")), new BindingImpl("city", new URIImpl("http://London")))); - assertEquals(expected, results); - - userConn = accumuloConn.getInstance().getConnector("abcUser", pass); - results = loadPcjResults(userConn, exportTableName); - expected = makeExpected( - makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Bob")), - new BindingImpl("city", new URIImpl("http://London"))), - makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Charlie")), - new BindingImpl("city", new URIImpl("http://London")))); - assertEquals(expected, results); - - userConn = accumuloConn.getInstance().getConnector("adeUser", pass); - results = loadPcjResults(userConn, exportTableName); - expected = makeExpected( - makeBindingSet( - new BindingImpl("customer", new URIImpl("http://Alice")), - new BindingImpl("worker", new URIImpl("http://Eve")), - new BindingImpl("city", new URIImpl("http://Leeds")))); - assertEquals(expected, results); - - userConn = accumuloConn.getInstance().getConnector("noAuth", pass); - results = loadPcjResults(userConn, exportTableName); - assertTrue(results.isEmpty()); - } - private Multimap<String, BindingSet> makeExpected(final BindingSet... bindingSets) { - final Set<BindingSet> expectedResults = new HashSet<>(); - for(final BindingSet bs : bindingSets) { - expectedResults.add(bs); - } + assertEquals(abExpected, abResults); + + // Verify ABC + final Connector abcConn = cluster.getConnector("abcUser", "password"); + final PrecomputedJoinStorage abcStorage = new AccumuloPcjStorage(abcConn, RYA_INSTANCE_NAME); + final Set<BindingSet> abcResults = toSet( abcStorage.listResults(pcjId) ); + + final Set<BindingSet> abcExpected = Sets.newHashSet(); + abcExpected.add(makeBindingSet( + new BindingImpl("customer", new URIImpl("http://Alice")), + new BindingImpl("worker", new URIImpl("http://Bob")), + new BindingImpl("city", new URIImpl("http://London")))); + abcExpected.add(makeBindingSet( + new BindingImpl("customer", new URIImpl("http://Alice")), + new BindingImpl("worker", new URIImpl("http://Charlie")), + new BindingImpl("city", new URIImpl("http://London")))); + + assertEquals(abcExpected, abcResults); + + // Verify ADE + final Connector adeConn = cluster.getConnector("adeUser", "password"); + final PrecomputedJoinStorage adeStorage = new AccumuloPcjStorage(adeConn, RYA_INSTANCE_NAME); + final Set<BindingSet> adeResults = toSet( adeStorage.listResults(pcjId) ); - final Multimap<String, BindingSet> expected = HashMultimap.create(); - expected.putAll("customer;worker;city", expectedResults); - expected.putAll("worker;city;customer", expectedResults); - expected.putAll("city;customer;worker", expectedResults); - return expected; + final Set<BindingSet> adeExpected = Sets.newHashSet(); + adeExpected.add(makeBindingSet( + new BindingImpl("customer", new URIImpl("http://Alice")), + new BindingImpl("worker", new URIImpl("http://Eve")), + new BindingImpl("city", new URIImpl("http://Leeds")))); + + assertEquals(adeExpected, adeResults); + + // Verify no auths. + final Connector noAuthConn = cluster.getConnector("noAuth", "password"); + final PrecomputedJoinStorage noAuthStorage = new AccumuloPcjStorage(noAuthConn, RYA_INSTANCE_NAME); + final Set<BindingSet> noAuthResults = toSet( noAuthStorage.listResults(pcjId) ); + assertTrue( noAuthResults.isEmpty() ); } - private void setupTestUsers(final String exportTableName) throws AccumuloException, AccumuloSecurityException { + private void setupTestUsers(final Connector accumuloConn, final String ryaInstanceName, final String pcjId) throws AccumuloException, AccumuloSecurityException { final PasswordToken pass = new PasswordToken("password"); final SecurityOperations secOps = accumuloConn.securityOperations(); + + // XXX We need the table name so that we can update security for the users. + final String pcjTableName = new PcjTableNameFactory().makeTableName(ryaInstanceName, pcjId); + + // Give the 'roor' user authorizations to see everything. secOps.changeUserAuthorizations("root", new Authorizations("A", "B", "C", "D", "E")); + + // Create a user that can see things with A and B. secOps.createLocalUser("abUser", pass); secOps.changeUserAuthorizations("abUser", new Authorizations("A", "B")); - secOps.grantTablePermission("abUser", exportTableName, TablePermission.READ); + secOps.grantTablePermission("abUser", pcjTableName, TablePermission.READ); + // Create a user that can see things with A, B, and C. secOps.createLocalUser("abcUser", pass); secOps.changeUserAuthorizations("abcUser", new Authorizations("A", "B", "C")); - secOps.grantTablePermission("abcUser", exportTableName, TablePermission.READ); + secOps.grantTablePermission("abcUser", pcjTableName, TablePermission.READ); + // Create a user that can see things with A, D, and E. secOps.createLocalUser("adeUser", pass); secOps.changeUserAuthorizations("adeUser", new Authorizations("A", "D", "E")); - secOps.grantTablePermission("adeUser", exportTableName, TablePermission.READ); - - secOps.createLocalUser("cUser", pass); - secOps.changeUserAuthorizations("cUser", new Authorizations("C")); - secOps.grantTablePermission("cUser", exportTableName, TablePermission.READ); + secOps.grantTablePermission("adeUser", pcjTableName, TablePermission.READ); + // Create a user that can't see anything. secOps.createLocalUser("noAuth", pass); secOps.changeUserAuthorizations("noAuth", new Authorizations()); - secOps.grantTablePermission("noAuth", exportTableName, TablePermission.READ); - } - - - private Multimap<String, BindingSet> getExpectedResults(final Set<BindingSet> results) { - final Multimap<String, BindingSet> expectedResults = HashMultimap.create(); - expectedResults.putAll("name;age", results); - expectedResults.putAll("age;name", results); - return expectedResults; + secOps.grantTablePermission("noAuth", pcjTableName, TablePermission.READ); } protected static void addStatementVisibilityEntry(final Map<RyaStatement, String> triplesMap, final RyaStatement statement, final String visibility) { triplesMap.put(statement, visibility); } - /** - * Scan accumulo for the results that are stored in a PCJ table. The - * multimap stores a set of deserialized binding sets that were in the PCJ - * table for every variable order that is found in the PCJ metadata. - * @throws AccumuloSecurityException - * @throws AccumuloException - */ - private static Multimap<String, BindingSet> loadPcjResults(final Connector accumuloConn, final String pcjTableName, final String... visibility) throws PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { - final Multimap<String, BindingSet> fetchedResults = HashMultimap.create(); - - final Authorizations userAuths; - if(visibility.length == 0) { - userAuths = accumuloConn.securityOperations().getUserAuthorizations(accumuloConn.whoami()); - } else { - userAuths = new Authorizations(visibility); - } - - // Get the variable orders the data was written to. - final PcjTables pcjs = new PcjTables(); - final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(accumuloConn, pcjTableName); - - // Scan Accumulo for the stored results. - for(final VariableOrder varOrder : pcjMetadata.getVarOrders()) { - final Scanner scanner = accumuloConn.createScanner(pcjTableName, userAuths); - scanner.fetchColumnFamily( new Text(varOrder.toString()) ); - - for(final Entry<Key, Value> entry : scanner) { - final byte[] serializedResult = entry.getKey().getRow().getBytes(); - final BindingSet result = converter.convert(serializedResult, varOrder); - fetchedResults.put(varOrder.toString(), result); - } + private Set<BindingSet> toSet(final Iterable<BindingSet> bindingSets) { + final Set<BindingSet> set = new HashSet<>(); + for(final BindingSet bindingSet : bindingSets) { + set.add( bindingSet ); } - - return fetchedResults; + return set; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java ---------------------------------------------------------------------- diff --git a/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java b/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java index 6b47c0c..0af9f32 100644 --- a/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java +++ b/sail/src/main/java/mvm/rya/rdftriplestore/RdfCloudTripleStore.java @@ -1,5 +1,13 @@ package mvm.rya.rdftriplestore; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; +import org.openrdf.sail.helpers.SailBase; + /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -8,9 +16,9 @@ package mvm.rya.rdftriplestore; * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -30,14 +38,6 @@ import mvm.rya.rdftriplestore.inference.InferenceEngine; import mvm.rya.rdftriplestore.namespace.NamespaceManager; import mvm.rya.rdftriplestore.provenance.ProvenanceCollector; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.sail.SailConnection; -import org.openrdf.sail.SailException; -import org.openrdf.sail.helpers.SailBase; - -import static com.google.common.base.Preconditions.checkNotNull; - public class RdfCloudTripleStore extends SailBase { private RdfCloudTripleStoreConfiguration conf; @@ -80,13 +80,6 @@ public class RdfCloudTripleStore extends SailBase { rdfEvalStatsDAO.init(); } - //TODO: Support inferencing with ryadao -// if (inferenceEngine != null && !inferenceEngine.isInitialized()) { -// inferenceEngine.setConf(this.conf); -// inferenceEngine.setRyaDAO(ryaDAO); -// inferenceEngine.init(); -// } - if (namespaceManager == null) { this.namespaceManager = new NamespaceManager(ryaDAO, this.conf); } @@ -94,6 +87,7 @@ public class RdfCloudTripleStore extends SailBase { @Override protected void shutDownInternal() throws SailException { + try { if (namespaceManager != null) { namespaceManager.shutdown(); @@ -135,11 +129,11 @@ public class RdfCloudTripleStore extends SailBase { public void setRdfEvalStatsDAO(RdfEvalStatsDAO rdfEvalStatsDAO) { this.rdfEvalStatsDAO = rdfEvalStatsDAO; } - + public SelectivityEvalDAO getSelectEvalDAO() { return selectEvalDAO; } - + public void setSelectEvalDAO(SelectivityEvalDAO selectEvalDAO) { this.selectEvalDAO = selectEvalDAO; }