http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializerTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializerTest.java new file mode 100644 index 0000000..d904d83 --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializerTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static org.junit.Assert.assertEquals; + +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.junit.Test; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import org.apache.rya.api.resolver.RyaTypeResolverException; + +/** + * Tests the methods of {@link AccumuloPcjSerialzer}. + */ +public class AccumuloPcjSerializerTest { + + /** + * The BindingSet has fewer Bindings than there are variables in the variable + * order, but they are all in the variable order. This is the case where + * the missing bindings were optional. + */ + @Test + public void serialize_bindingsSubsetOfVarOrder() throws BindingSetConversionException { + // Setup the Binding Set. + final MapBindingSet originalBindingSet = new MapBindingSet(); + originalBindingSet.addBinding("x", new URIImpl("http://a")); + originalBindingSet.addBinding("y", new URIImpl("http://b")); + + // Setup the variable order. + final VariableOrder varOrder = new VariableOrder("x", "a", "y", "b"); + + // Create the byte[] representation of the BindingSet. + BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); + byte[] serialized = converter.convert(originalBindingSet, varOrder); + + // Deserialize the byte[] back into the binding set. + BindingSet deserialized = converter.convert(serialized, varOrder); + + // Ensure the deserialized value matches the original. + assertEquals(originalBindingSet, deserialized); + } + + /** + * The BindingSet has more Bindings than there are variables in the variable order. + * This is the case where a Group By clause does not include all of the Bindings that + * are in the Binding Set. + */ + @Test + public void serialize_bindingNotInVariableOrder() throws RyaTypeResolverException, BindingSetConversionException { + // Setup the Binding Set. + final MapBindingSet originalBindingSet = new MapBindingSet(); + originalBindingSet.addBinding("x", new URIImpl("http://a")); + originalBindingSet.addBinding("y", new URIImpl("http://b")); + originalBindingSet.addBinding("z", new URIImpl("http://d")); + + // Setup the variable order. + final VariableOrder varOrder = new VariableOrder("x", "y"); + + // Serialize the Binding Set. + BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); + byte[] serialized = converter.convert(originalBindingSet, varOrder); + + // Deserialize it again. + BindingSet deserialized = converter.convert(serialized, varOrder); + + // Show that it only contains the bindings that were part of the Variable Order. + MapBindingSet expected = new MapBindingSet(); + expected.addBinding("x", new URIImpl("http://a")); + expected.addBinding("y", new URIImpl("http://b")); + + assertEquals(expected, deserialized); + } + + @Test + public void basicShortUriBsTest() throws BindingSetConversionException { + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("X",new URIImpl("http://uri1")); + bs.addBinding("Y",new URIImpl("http://uri2")); + final VariableOrder varOrder = new VariableOrder("X","Y"); + + BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); + final byte[] byteVal = converter.convert(bs, varOrder); + final BindingSet newBs = converter.convert(byteVal, varOrder); + assertEquals(bs, newBs); + } + + @Test + public void basicLongUriBsTest() throws BindingSetConversionException { + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("X",new URIImpl("http://uri1")); + bs.addBinding("Y",new URIImpl("http://uri2")); + bs.addBinding("Z",new URIImpl("http://uri3")); + bs.addBinding("A",new URIImpl("http://uri4")); + bs.addBinding("B",new URIImpl("http://uri5")); + final VariableOrder varOrder = new VariableOrder("X","Y","Z","A","B"); + + BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); + final byte[] byteVal = converter.convert(bs, varOrder); + final BindingSet newBs = converter.convert(byteVal, varOrder); + assertEquals(bs, newBs); + } + + @Test + public void basicShortStringLiteralBsTest() throws BindingSetConversionException { + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("X",new LiteralImpl("literal1")); + bs.addBinding("Y",new LiteralImpl("literal2")); + final VariableOrder varOrder = new VariableOrder("X","Y"); + + BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); + final byte[] byteVal = converter.convert(bs, varOrder); + final BindingSet newBs = converter.convert(byteVal, varOrder); + assertEquals(bs, newBs); + } + + @Test + public void basicShortMixLiteralBsTest() throws BindingSetConversionException { + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("X",new LiteralImpl("literal1")); + bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer"))); + final VariableOrder varOrder = new VariableOrder("X","Y"); + + BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); + final byte[] byteVal = converter.convert(bs, varOrder); + final BindingSet newBs = converter.convert(byteVal, varOrder); + assertEquals(bs, newBs); + } + + @Test + public void basicLongMixLiteralBsTest() throws BindingSetConversionException { + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("X",new LiteralImpl("literal1")); + bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer"))); + bs.addBinding("Z",new LiteralImpl("5.0", new URIImpl("http://www.w3.org/2001/XMLSchema#double"))); + bs.addBinding("W",new LiteralImpl("1000", new URIImpl("http://www.w3.org/2001/XMLSchema#long"))); + final VariableOrder varOrder = new VariableOrder("W","X","Y","Z"); + + BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); + final byte[] byteVal = converter.convert(bs, varOrder); + final BindingSet newBs = converter.convert(byteVal, varOrder); + assertEquals(bs, newBs); + } + + @Test + public void basicMixUriLiteralBsTest() throws BindingSetConversionException { + final QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("X",new LiteralImpl("literal1")); + bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer"))); + bs.addBinding("Z",new LiteralImpl("5.0", new URIImpl("http://www.w3.org/2001/XMLSchema#double"))); + bs.addBinding("W",new LiteralImpl("1000", new URIImpl("http://www.w3.org/2001/XMLSchema#long"))); + bs.addBinding("A",new URIImpl("http://uri1")); + bs.addBinding("B",new URIImpl("http://uri2")); + bs.addBinding("C",new URIImpl("http://uri3")); + final VariableOrder varOrder = new VariableOrder("A","W","X","Y","Z","B","C"); + + BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); + final byte[] byteVal = converter.convert(bs, varOrder); + final BindingSet newBs = converter.convert(byteVal, varOrder); + assertEquals(bs, newBs); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerialzerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerialzerTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerialzerTest.java deleted file mode 100644 index d69205e..0000000 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerialzerTest.java +++ /dev/null @@ -1,175 +0,0 @@ -package org.apache.rya.indexing.pcj.storage.accumulo; - -import static org.junit.Assert.assertEquals; - -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.junit.Test; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.query.BindingSet; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; -import org.openrdf.query.impl.MapBindingSet; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import org.apache.rya.api.resolver.RyaTypeResolverException; - -/** - * Tests the methods of {@link AccumuloPcjSerialzer}. - */ -public class AccumuloPcjSerialzerTest { - - /** - * The BindingSet has fewer Bindings than there are variables in the variable - * order, but they are all in the variable order. This is the case where - * the missing bindings were optional. - */ - @Test - public void serialize_bindingsSubsetOfVarOrder() throws BindingSetConversionException { - // Setup the Binding Set. - final MapBindingSet originalBindingSet = new MapBindingSet(); - originalBindingSet.addBinding("x", new URIImpl("http://a")); - originalBindingSet.addBinding("y", new URIImpl("http://b")); - - // Setup the variable order. - final VariableOrder varOrder = new VariableOrder("x", "a", "y", "b"); - - // Create the byte[] representation of the BindingSet. - BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); - byte[] serialized = converter.convert(originalBindingSet, varOrder); - - // Deserialize the byte[] back into the binding set. - BindingSet deserialized = converter.convert(serialized, varOrder); - - // Ensure the deserialized value matches the original. - assertEquals(originalBindingSet, deserialized); - } - - /** - * The BindingSet has a Binding whose name is not in the variable order. - * This is illegal. - */ - @Test(expected = IllegalArgumentException.class) - public void serialize_bindingNotInVariableOrder() throws RyaTypeResolverException, BindingSetConversionException { - // Setup the Binding Set. - final MapBindingSet originalBindingSet = new MapBindingSet(); - originalBindingSet.addBinding("x", new URIImpl("http://a")); - originalBindingSet.addBinding("y", new URIImpl("http://b")); - originalBindingSet.addBinding("z", new URIImpl("http://d")); - - // Setup the variable order. - final VariableOrder varOrder = new VariableOrder("x", "y"); - - // Create the byte[] representation of the BindingSet. This will throw an exception. - BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); - converter.convert(originalBindingSet, varOrder); - } - - @Test - public void basicShortUriBsTest() throws BindingSetConversionException { - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("X",new URIImpl("http://uri1")); - bs.addBinding("Y",new URIImpl("http://uri2")); - final VariableOrder varOrder = new VariableOrder("X","Y"); - - BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); - final byte[] byteVal = converter.convert(bs, varOrder); - final BindingSet newBs = converter.convert(byteVal, varOrder); - assertEquals(bs, newBs); - } - - @Test - public void basicLongUriBsTest() throws BindingSetConversionException { - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("X",new URIImpl("http://uri1")); - bs.addBinding("Y",new URIImpl("http://uri2")); - bs.addBinding("Z",new URIImpl("http://uri3")); - bs.addBinding("A",new URIImpl("http://uri4")); - bs.addBinding("B",new URIImpl("http://uri5")); - final VariableOrder varOrder = new VariableOrder("X","Y","Z","A","B"); - - BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); - final byte[] byteVal = converter.convert(bs, varOrder); - final BindingSet newBs = converter.convert(byteVal, varOrder); - assertEquals(bs, newBs); - } - - @Test - public void basicShortStringLiteralBsTest() throws BindingSetConversionException { - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("X",new LiteralImpl("literal1")); - bs.addBinding("Y",new LiteralImpl("literal2")); - final VariableOrder varOrder = new VariableOrder("X","Y"); - - BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); - final byte[] byteVal = converter.convert(bs, varOrder); - final BindingSet newBs = converter.convert(byteVal, varOrder); - assertEquals(bs, newBs); - } - - @Test - public void basicShortMixLiteralBsTest() throws BindingSetConversionException { - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("X",new LiteralImpl("literal1")); - bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer"))); - final VariableOrder varOrder = new VariableOrder("X","Y"); - - BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); - final byte[] byteVal = converter.convert(bs, varOrder); - final BindingSet newBs = converter.convert(byteVal, varOrder); - assertEquals(bs, newBs); - } - - @Test - public void basicLongMixLiteralBsTest() throws BindingSetConversionException { - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("X",new LiteralImpl("literal1")); - bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer"))); - bs.addBinding("Z",new LiteralImpl("5.0", new URIImpl("http://www.w3.org/2001/XMLSchema#double"))); - bs.addBinding("W",new LiteralImpl("1000", new URIImpl("http://www.w3.org/2001/XMLSchema#long"))); - final VariableOrder varOrder = new VariableOrder("W","X","Y","Z"); - - BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); - final byte[] byteVal = converter.convert(bs, varOrder); - final BindingSet newBs = converter.convert(byteVal, varOrder); - assertEquals(bs, newBs); - } - - @Test - public void basicMixUriLiteralBsTest() throws BindingSetConversionException { - final QueryBindingSet bs = new QueryBindingSet(); - bs.addBinding("X",new LiteralImpl("literal1")); - bs.addBinding("Y",new LiteralImpl("5", new URIImpl("http://www.w3.org/2001/XMLSchema#integer"))); - bs.addBinding("Z",new LiteralImpl("5.0", new URIImpl("http://www.w3.org/2001/XMLSchema#double"))); - bs.addBinding("W",new LiteralImpl("1000", new URIImpl("http://www.w3.org/2001/XMLSchema#long"))); - bs.addBinding("A",new URIImpl("http://uri1")); - bs.addBinding("B",new URIImpl("http://uri2")); - bs.addBinding("C",new URIImpl("http://uri3")); - final VariableOrder varOrder = new VariableOrder("A","W","X","Y","Z","B","C"); - - BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer(); - final byte[] byteVal = converter.convert(bs, varOrder); - final BindingSet newBs = converter.convert(byteVal, varOrder); - assertEquals(bs, newBs); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java index e01e7de..b263038 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java @@ -23,10 +23,7 @@ import static org.junit.Assert.assertEquals; import java.math.BigDecimal; import java.math.BigInteger; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.junit.Test; import org.openrdf.model.impl.BooleanLiteralImpl; import org.openrdf.model.impl.DecimalLiteralImpl; @@ -41,6 +38,23 @@ import org.openrdf.query.impl.MapBindingSet; public class BindingSetStringConverterTest { @Test + public void noBindings() throws BindingSetConversionException { + // Create a BindingSet that doesn't have any bindings. + final MapBindingSet original = new MapBindingSet(); + + // Convert it to a String. + final VariableOrder varOrder = new VariableOrder(); + final BindingSetConverter<String> converter = new BindingSetStringConverter(); + final String bindingSetString = converter.convert(original, varOrder); + + // Convert it back to a binding set. + final BindingSet converted = converter.convert(bindingSetString, varOrder); + + // Ensure it is still an empty BindingSet. + assertEquals(original, converted); + } + + @Test public void toString_URIs() throws BindingSetConversionException { // Setup the binding set that will be converted. final MapBindingSet originalBindingSet = new MapBindingSet(); @@ -53,7 +67,7 @@ public class BindingSetStringConverterTest { final BindingSetConverter<String> converter = new BindingSetStringConverter(); final String bindingSetString = converter.convert(originalBindingSet, varOrder); - // Ensure it converted to the expected result.l + // Ensure it converted to the expected result. final String expected = "http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" + "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::" + @@ -163,26 +177,6 @@ public class BindingSetStringConverterTest { assertEquals(expected, bindingSetString); } - /** - * The BindingSet has a Binding whose name is not in the variable order. - * This is illegal. - */ - @Test(expected = IllegalArgumentException.class) - public void toString_bindingNotInVariableOrder() throws BindingSetConversionException { - // Setup the Binding Set. - final MapBindingSet originalBindingSet = new MapBindingSet(); - originalBindingSet.addBinding("x", new URIImpl("http://a")); - originalBindingSet.addBinding("y", new URIImpl("http://b")); - originalBindingSet.addBinding("z", new URIImpl("http://d")); - - // Setup the variable order. - final VariableOrder varOrder = new VariableOrder("x", "y"); - - // Create the String representation of the BindingSet. This will throw an exception. - final BindingSetConverter<String> converter = new BindingSetStringConverter(); - converter.convert(originalBindingSet, varOrder); - } - @Test public void fromString() throws BindingSetConversionException { // Setup the String that will be converted. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java index 623526e..2bcce65 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java @@ -38,10 +38,17 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.accumulo.MiniAccumuloClusterInstance; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.rdftriplestore.RdfCloudTripleStore; +import org.apache.rya.rdftriplestore.RyaSailRepository; import org.apache.zookeeper.ClientCnxn; import org.junit.After; import org.junit.Before; @@ -63,13 +70,6 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.AccumuloRyaDAO; -import org.apache.rya.accumulo.MiniAccumuloClusterInstance; -import org.apache.rya.api.RdfCloudTripleStoreConfiguration; -import org.apache.rya.rdftriplestore.RdfCloudTripleStore; -import org.apache.rya.rdftriplestore.RyaSailRepository; - /** * Performs integration test using {@link MiniAccumuloCluster} to ensure the * functions of {@link PcjTables} work within a cluster setting. @@ -237,7 +237,7 @@ public class PcjTablesIntegrationTest { } @Test - public void listResults() throws PCJStorageException, AccumuloException, AccumuloSecurityException { + public void listResults() throws Exception { final String sparql = "SELECT ?name ?age " + "{" + @@ -274,8 +274,14 @@ public class PcjTablesIntegrationTest { // Fetch the Binding Sets that have been stored in the PCJ table. final Set<BindingSet> results = new HashSet<>(); - for(final BindingSet result : pcjs.listResults(accumuloConn, pcjTableName, new Authorizations())) { - results.add( result ); + + final CloseableIterator<BindingSet> resultsIt = pcjs.listResults(accumuloConn, pcjTableName, new Authorizations()); + try { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } finally { + resultsIt.close(); } // Verify the fetched results match the expected ones. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java index 7eeff1d..98ed4c7 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java @@ -31,8 +31,15 @@ import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; +import org.apache.rya.accumulo.AccumuloRyaITBase; +import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; @@ -46,13 +53,6 @@ import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.ImmutableMap; -import org.apache.rya.accumulo.AccumuloRyaITBase; -import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; -import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; -import org.apache.rya.api.instance.RyaDetailsRepository; -import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException; -import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; - /** * Integration tests the methods of {@link AccumuloPcjStorage}. * </p> @@ -66,23 +66,23 @@ public class AccumuloPcjStorageIT extends AccumuloRyaITBase { // Setup the PCJ storage that will be tested against. final Connector connector = super.getClusterInstance().getConnector(); final String ryaInstanceName = super.getRyaInstanceName(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); - - // Create a PCJ. - final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) { + // Create a PCJ. + final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); - // Ensure the Rya details have been updated to include the PCJ's ID. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName); + // Ensure the Rya details have been updated to include the PCJ's ID. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName); - final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails() - .getPCJIndexDetails() - .getPCJDetails(); + final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails() + .getPCJIndexDetails() + .getPCJDetails(); - final PCJDetails expectedDetails = PCJDetails.builder() - .setId( pcjId ) - .build(); + final PCJDetails expectedDetails = PCJDetails.builder() + .setId( pcjId ) + .build(); - assertEquals(expectedDetails, detailsMap.get(pcjId)); + assertEquals(expectedDetails, detailsMap.get(pcjId)); + } } @Test @@ -90,22 +90,22 @@ public class AccumuloPcjStorageIT extends AccumuloRyaITBase { // Setup the PCJ storage that will be tested against. final Connector connector = super.getClusterInstance().getConnector(); final String ryaInstanceName = super.getRyaInstanceName(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); - - // Create a PCJ. - final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) { + // Create a PCJ. + final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); - // Delete the PCJ that was just created. - pcjStorage.dropPcj(pcjId); + // Delete the PCJ that was just created. + pcjStorage.dropPcj(pcjId); - // Ensure the Rya details have been updated to no longer include the PCJ's ID. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName); + // Ensure the Rya details have been updated to no longer include the PCJ's ID. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName); - final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails() - .getPCJIndexDetails() - .getPCJDetails(); + final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails() + .getPCJIndexDetails() + .getPCJDetails(); - assertFalse( detailsMap.containsKey(pcjId) ); + assertFalse( detailsMap.containsKey(pcjId) ); + } } @Test @@ -113,27 +113,27 @@ public class AccumuloPcjStorageIT extends AccumuloRyaITBase { // Setup the PCJ storage that will be tested against. final Connector connector = super.getClusterInstance().getConnector(); final String ryaInstanceName = super.getRyaInstanceName(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); - - // Create a few PCJs and hold onto their IDs. - final List<String> expectedIds = new ArrayList<>(); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) { + // Create a few PCJs and hold onto their IDs. + final List<String> expectedIds = new ArrayList<>(); - String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); - expectedIds.add( pcjId ); + String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); - pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); - expectedIds.add( pcjId ); + pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); - pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); - expectedIds.add( pcjId ); + pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); - // Fetch the PCJ names - final List<String> pcjIds = pcjStorage.listPcjs(); + // Fetch the PCJ names + final List<String> pcjIds = pcjStorage.listPcjs(); - // Ensure the expected IDs match the fetched IDs. - Collections.sort(expectedIds); - Collections.sort(pcjIds); - assertEquals(expectedIds, pcjIds); + // Ensure the expected IDs match the fetched IDs. + Collections.sort(expectedIds); + Collections.sort(pcjIds); + assertEquals(expectedIds, pcjIds); + } } @Test @@ -141,19 +141,19 @@ public class AccumuloPcjStorageIT extends AccumuloRyaITBase { // Setup the PCJ storage that will be tested against. final Connector connector = super.getClusterInstance().getConnector(); final String ryaInstanceName = super.getRyaInstanceName(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); - - // Create a PCJ. - final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; - final String pcjId = pcjStorage.createPcj(sparql); - - // Fetch the PCJ's metadata. - final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); - - // Ensure it has the expected values. - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); - final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); - assertEquals(expectedMetadata, metadata); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) { + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Fetch the PCJ's metadata. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + + // Ensure it has the expected values. + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); + assertEquals(expectedMetadata, metadata); + } } @Test @@ -161,112 +161,116 @@ public class AccumuloPcjStorageIT extends AccumuloRyaITBase { // Setup the PCJ storage that will be tested against. final Connector connector = super.getClusterInstance().getConnector(); final String ryaInstanceName = super.getRyaInstanceName(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) { + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); - // Create a PCJ. - final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; - final String pcjId = pcjStorage.createPcj(sparql); + // Add some binding sets to it. + final Set<VisibilityBindingSet> results = new HashSet<>(); - // Add some binding sets to it. - final Set<VisibilityBindingSet> results = new HashSet<>(); + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + results.add( new VisibilityBindingSet(aliceBS, "") ); - final MapBindingSet aliceBS = new MapBindingSet(); - aliceBS.addBinding("a", new URIImpl("http://Alice")); - aliceBS.addBinding("b", new URIImpl("http://Person")); - results.add( new VisibilityBindingSet(aliceBS, "") ); + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + results.add( new VisibilityBindingSet(charlieBS, "") ); - final MapBindingSet charlieBS = new MapBindingSet(); - charlieBS.addBinding("a", new URIImpl("http://Charlie")); - charlieBS.addBinding("b", new URIImpl("http://Comedian")); - results.add( new VisibilityBindingSet(charlieBS, "") ); + pcjStorage.addResults(pcjId, results); - pcjStorage.addResults(pcjId, results); + // Make sure the PCJ metadata was updated. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); - // Make sure the PCJ metadata was updated. - final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); - - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); - final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders); - assertEquals(expectedMetadata, metadata); + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders); + assertEquals(expectedMetadata, metadata); + } } @Test - public void listResults() throws AccumuloException, AccumuloSecurityException, PCJStorageException { + public void listResults() throws Exception { // Setup the PCJ storage that will be tested against. final Connector connector = super.getClusterInstance().getConnector(); final String ryaInstanceName = super.getRyaInstanceName(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); - - // Create a PCJ. - final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; - final String pcjId = pcjStorage.createPcj(sparql); - - // Add some binding sets to it. - final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); - - final MapBindingSet aliceBS = new MapBindingSet(); - aliceBS.addBinding("a", new URIImpl("http://Alice")); - aliceBS.addBinding("b", new URIImpl("http://Person")); - expectedResults.add( new VisibilityBindingSet(aliceBS, "") ); - - final MapBindingSet charlieBS = new MapBindingSet(); - charlieBS.addBinding("a", new URIImpl("http://Charlie")); - charlieBS.addBinding("b", new URIImpl("http://Comedian")); - expectedResults.add( new VisibilityBindingSet(charlieBS, "") ); - - pcjStorage.addResults(pcjId, expectedResults); - - // List the results that were stored. - final Set<BindingSet> results = new HashSet<>(); - for(final BindingSet result : pcjStorage.listResults(pcjId)) { - results.add( result ); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) { + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Add some binding sets to it. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + expectedResults.add( new VisibilityBindingSet(aliceBS, "") ); + + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + expectedResults.add( new VisibilityBindingSet(charlieBS, "") ); + + pcjStorage.addResults(pcjId, expectedResults); + + // List the results that were stored. + final Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } + + assertEquals(expectedResults, results); } - - assertEquals(expectedResults, results); } @Test - public void purge() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException { + public void purge() throws Exception { // Setup the PCJ storage that will be tested against. final Connector connector = super.getClusterInstance().getConnector(); final String ryaInstanceName = super.getRyaInstanceName(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); - - // Create a PCJ. - final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; - final String pcjId = pcjStorage.createPcj(sparql); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) { + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); - // Add some binding sets to it. - final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + // Add some binding sets to it. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); - final MapBindingSet aliceBS = new MapBindingSet(); - aliceBS.addBinding("a", new URIImpl("http://Alice")); - aliceBS.addBinding("b", new URIImpl("http://Person")); - expectedResults.add( new VisibilityBindingSet(aliceBS, "") ); + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + expectedResults.add( new VisibilityBindingSet(aliceBS, "") ); - final MapBindingSet charlieBS = new MapBindingSet(); - charlieBS.addBinding("a", new URIImpl("http://Charlie")); - charlieBS.addBinding("b", new URIImpl("http://Comedian")); - expectedResults.add( new VisibilityBindingSet(charlieBS, "") ); + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + expectedResults.add( new VisibilityBindingSet(charlieBS, "") ); - pcjStorage.addResults(pcjId, expectedResults); + pcjStorage.addResults(pcjId, expectedResults); - // Purge the PCJ. - pcjStorage.purge(pcjId); + // Purge the PCJ. + pcjStorage.purge(pcjId); - // List the results that were stored. - final Set<BindingSet> results = new HashSet<>(); - for(final BindingSet result : pcjStorage.listResults(pcjId)) { - results.add( result ); - } + // List the results that were stored. + final Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } - assertTrue( results.isEmpty() ); + assertTrue( results.isEmpty() ); - // Make sure the PCJ metadata was updated. - final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + // Make sure the PCJ metadata was updated. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); - final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); - final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); - assertEquals(expectedMetadata, metadata); + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); + assertEquals(expectedMetadata, metadata); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java index 86f5b48..1de0813 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -22,9 +22,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Objects.requireNonNull; +import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Set; @@ -34,6 +34,7 @@ import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.security.Authorizations; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.Transaction; +import org.apache.log4j.Logger; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; import org.apache.rya.api.domain.RyaStatement; @@ -52,15 +53,14 @@ import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.calrissian.mango.collect.CloseableIterable; import org.openrdf.model.Resource; import org.openrdf.model.URI; import org.openrdf.model.Value; import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; -import org.openrdf.sail.SailException; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -82,6 +82,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; */ @DefaultAnnotation(NonNull.class) public class CreatePcj { + private static final Logger log = Logger.getLogger(CreatePcj.class); /** * The default Statement Pattern batch insert size is 1000. @@ -113,7 +114,51 @@ public class CreatePcj { this.spInsertBatchSize = spInsertBatchSize; } - + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. + * + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param pcjStorage - Provides access to the PCJ index. (not null) + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @return The metadata that was written to the Fluo application for the PCJ. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + */ + public FluoQuery createPcj( + final String pcjId, + final PrecomputedJoinStorage pcjStorage, + final FluoClient fluo) throws MalformedQueryException, PcjException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); + + // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo. + // We use these IDs later when scanning Rya for historic Statement Pattern matches + // as well as setting up automatic exports. + final NodeIds nodeIds = new NodeIds(); + + // Parse the query's structure for the metadata that will be written to fluo. + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); + final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); + + try (Transaction tx = fluo.newTransaction()) { + // Write the query's structure to Fluo. + new FluoQueryMetadataDAO().write(tx, fluoQuery); + + // The results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ. + final String queryId = fluoQuery.getQueryMetadata().getNodeId(); + tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); + tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); + + // Flush the changes to Fluo. + tx.commit(); + } + + return fluoQuery; + } + /** * Tells the Fluo PCJ Updater application to maintain a new PCJ. * <p> @@ -126,147 +171,115 @@ public class CreatePcj { * @param pcjStorage - Provides access to the PCJ index. (not null) * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) * @param queryEngine - QueryEngine for a given Rya Instance, (not null) - * + * @return The Fluo application's Query ID of the query that was created. * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. - * @throws SailException Historic PCJ results could not be loaded because of a problem with {@code rya}. - * @throws QueryEvaluationException Historic PCJ results could not be loaded because of a problem with {@code rya}. + * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. */ - public String withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, - final Connector accumulo, String ryaInstance ) - throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException { + public String withRyaIntegration( + final String pcjId, + final PrecomputedJoinStorage pcjStorage, + final FluoClient fluo, + final Connector accumulo, + final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException { requireNonNull(pcjId); requireNonNull(pcjStorage); requireNonNull(fluo); - requireNonNull(accumulo); - requireNonNull(ryaInstance); - - //Create AccumuloRyaQueryEngine to query for historic results - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(ryaInstance); - conf.setAuths(getAuths(accumulo)); - AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf); - - - // Keeps track of the IDs that are assigned to each of the query's nodes - // in Fluo. - // We use these IDs later when scanning Rya for historic Statement - // Pattern matches - // as well as setting up automatic exports. - final NodeIds nodeIds = new NodeIds(); - - // Parse the query's structure for the metadata that will be written to - // fluo. - final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); - final String sparql = pcjMetadata.getSparql(); - final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); - final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); + requireNonNull(accumulo); + requireNonNull(ryaInstance); + + // Write the SPARQL query's structure to the Fluo Application. + final FluoQuery fluoQuery = createPcj(pcjId, pcjStorage, fluo); + + // Reuse the same set object while performing batch inserts. + final Set<RyaStatement> queryBatch = new HashSet<>(); + + // Iterate through each of the statement patterns and insert their historic matches into Fluo. + for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) { + // Get an iterator over all of the binding sets that match the statement pattern. + final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern()); + queryBatch.add(spToRyaStatement(pattern)); + } + + //Create AccumuloRyaQueryEngine to query for historic results + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(ryaInstance); + conf.setAuths(getAuths(accumulo)); + + try(final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf); + CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) { + final Set<RyaStatement> triplesBatch = new HashSet<>(); + + // Insert batches of the binding sets into Fluo. + for(final RyaStatement ryaStatement : queryIterable) { + if (triplesBatch.size() == spInsertBatchSize) { + writeBatch(fluo, triplesBatch); + triplesBatch.clear(); + } + + triplesBatch.add(ryaStatement); + } + + if (!triplesBatch.isEmpty()) { + writeBatch(fluo, triplesBatch); + triplesBatch.clear(); + } + } catch (final IOException e) { + log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e); + } // return queryId to the caller for later monitoring from the export. - String queryId = null; - - try (Transaction tx = fluo.newTransaction()) { - // Write the query's structure to Fluo. - new FluoQueryMetadataDAO().write(tx, fluoQuery); - - // The results of the query are eventually exported to an instance - // of Rya, so store the Rya ID for the PCJ. - queryId = fluoQuery.getQueryMetadata().getNodeId(); - tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); - tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); - - // Flush the changes to Fluo. - tx.commit(); - } - - // Reuse the same set object while performing batch inserts. - final Set<RyaStatement> queryBatch = new HashSet<>(); - - // Iterate through each of the statement patterns and insert their - // historic matches into Fluo. - for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) { - // Get an iterator over all of the binding sets that match the - // statement pattern. - final StatementPattern pattern = FluoStringConverter - .toStatementPattern(patternMetadata.getStatementPattern()); - queryBatch.add(spToRyaStatement(pattern)); - } - - Iterator<RyaStatement> triples = queryEngine.query(new BatchRyaQuery(queryBatch)).iterator(); - Set<RyaStatement> triplesBatch = new HashSet<>(); - - // Insert batches of the binding sets into Fluo. - while (triples.hasNext()) { - if (triplesBatch.size() == spInsertBatchSize) { - writeBatch(fluo, triplesBatch); - triplesBatch.clear(); - } - - triplesBatch.add(triples.next()); - } - - if (!triplesBatch.isEmpty()) { - writeBatch(fluo, triplesBatch); - triplesBatch.clear(); - } - return queryId; + return fluoQuery.getQueryMetadata().getNodeId(); } - + private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) { checkNotNull(fluo); checkNotNull(batch); - new InsertTriples().insert(fluo, batch); + } + + private static RyaStatement spToRyaStatement(final StatementPattern sp) { + final Value subjVal = sp.getSubjectVar().getValue(); + final Value predVal = sp.getPredicateVar().getValue(); + final Value objVal = sp.getObjectVar().getValue(); + + RyaURI subjURI = null; + RyaURI predURI = null; + RyaType objType = null; + if(subjVal != null) { + if(!(subjVal instanceof Resource)) { + throw new AssertionError("Subject must be a Resource."); + } + subjURI = RdfToRyaConversions.convertResource((Resource) subjVal); + } + + if (predVal != null) { + if(!(predVal instanceof URI)) { + throw new AssertionError("Predicate must be a URI."); + } + predURI = RdfToRyaConversions.convertURI((URI) predVal); + } + + if (objVal != null ) { + objType = RdfToRyaConversions.convertValue(objVal); + } + + return new RyaStatement(subjURI, predURI, objType); } - - - private static RyaStatement spToRyaStatement(StatementPattern sp) { - - Value subjVal = sp.getSubjectVar().getValue(); - Value predVal = sp.getPredicateVar().getValue(); - Value objVal = sp.getObjectVar().getValue(); - - RyaURI subjURI = null; - RyaURI predURI = null; - RyaType objType = null; - - if(subjVal != null) { - if(!(subjVal instanceof Resource)) { - throw new AssertionError("Subject must be a Resource."); - } - subjURI = RdfToRyaConversions.convertResource((Resource) subjVal); - } - - if (predVal != null) { - if(!(predVal instanceof URI)) { - throw new AssertionError("Predicate must be a URI."); - } - predURI = RdfToRyaConversions.convertURI((URI) predVal); - } - - if (objVal != null ) { - objType = RdfToRyaConversions.convertValue(objVal); - } - - return new RyaStatement(subjURI, predURI, objType); + + private String[] getAuths(final Connector accumulo) { + Authorizations auths; + try { + auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami()); + final List<byte[]> authList = auths.getAuthorizations(); + final String[] authArray = new String[authList.size()]; + for(int i = 0; i < authList.size(); i++){ + authArray[i] = new String(authList.get(i), "UTF-8"); + } + return authArray; + } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) { + throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami()); + } } - - - private String[] getAuths(Connector accumulo) { - Authorizations auths; - try { - auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami()); - List<byte[]> authList = auths.getAuthorizations(); - String[] authArray = new String[authList.size()]; - for(int i = 0; i < authList.size(); i++){ - authArray[i] = new String(authList.get(i), "UTF-8"); - } - return authArray; - } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) { - throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami()); - } - } - - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java index 1d92262..c11f9fb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java @@ -25,10 +25,15 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.client.scanner.CellScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumnValue; +import org.apache.fluo.api.data.Span; import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; @@ -36,13 +41,8 @@ import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; import org.openrdf.query.BindingSet; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.client.scanner.CellScanner; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumnValue; -import org.apache.fluo.api.data.Span; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Deletes a Pre-computed Join (PCJ) from Fluo. @@ -154,6 +154,12 @@ public class DeletePcj { nodeIds.add(filterChild); getChildNodeIds(tx, filterChild, nodeIds); break; + case AGGREGATION: + final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId); + final String aggChild = aggMeta.getChildNodeId(); + nodeIds.add(aggChild); + getChildNodeIds(tx, aggChild, nodeIds); + break; case STATEMENT_PATTERN: break; } @@ -254,7 +260,7 @@ public class DeletePcj { try(Transaction ntx = tx) { int count = 0; - Iterator<RowColumnValue> iter = scanner.iterator(); + final Iterator<RowColumnValue> iter = scanner.iterator(); while (iter.hasNext() && count < batchSize) { final Bytes row = iter.next().getRow(); count++; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml index 343713c..38fff95 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml @@ -64,6 +64,14 @@ under the License. </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-recipes-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-recipes-accumulo</artifactId> + </dependency> <dependency> <groupId>org.apache.kafka</groupId> @@ -81,7 +89,7 @@ under the License. </exclusion> </exclusions> </dependency> - <dependency> + <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>${kryo.version}</version>
