http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializerTest.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializerTest.java
new file mode 100644
index 0000000..d904d83
--- /dev/null
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.junit.Test;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import org.apache.rya.api.resolver.RyaTypeResolverException;
+
+/**
+ * Tests the methods of {@link AccumuloPcjSerialzer}.
+ */
+public class AccumuloPcjSerializerTest {
+
+    /**
+     * The BindingSet has fewer Bindings than there are variables in the 
variable
+     * order, but they are all in the variable order. This is the case where
+     * the missing bindings were optional.
+     */
+    @Test
+    public void serialize_bindingsSubsetOfVarOrder() throws 
BindingSetConversionException {
+        // Setup the Binding Set.
+        final MapBindingSet originalBindingSet = new MapBindingSet();
+        originalBindingSet.addBinding("x", new URIImpl("http://a";));
+        originalBindingSet.addBinding("y", new URIImpl("http://b";));
+
+        // Setup the variable order.
+        final VariableOrder varOrder = new VariableOrder("x", "a", "y", "b");
+
+        // Create the byte[] representation of the BindingSet.
+        BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+        byte[] serialized = converter.convert(originalBindingSet, varOrder);
+
+        // Deserialize the byte[] back into the binding set.
+        BindingSet deserialized = converter.convert(serialized, varOrder);
+
+        // Ensure the deserialized value matches the original.
+        assertEquals(originalBindingSet, deserialized);
+    }
+
+    /**
+     * The BindingSet has more Bindings than there are variables in the 
variable order.
+     * This is the case where a Group By clause does not include all of the 
Bindings that
+     * are in the Binding Set.
+     */
+    @Test
+    public void serialize_bindingNotInVariableOrder() throws 
RyaTypeResolverException, BindingSetConversionException {
+        // Setup the Binding Set.
+        final MapBindingSet originalBindingSet = new MapBindingSet();
+        originalBindingSet.addBinding("x", new URIImpl("http://a";));
+        originalBindingSet.addBinding("y", new URIImpl("http://b";));
+        originalBindingSet.addBinding("z", new URIImpl("http://d";));
+
+        // Setup the variable order.
+        final VariableOrder varOrder = new VariableOrder("x", "y");
+
+        // Serialize the Binding Set.
+        BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
+        byte[] serialized = converter.convert(originalBindingSet, varOrder);
+        
+        // Deserialize it again.
+        BindingSet deserialized = converter.convert(serialized, varOrder);
+        
+        // Show that it only contains the bindings that were part of the 
Variable Order.
+        MapBindingSet expected = new MapBindingSet();
+        expected.addBinding("x", new URIImpl("http://a";));
+        expected.addBinding("y", new URIImpl("http://b";));
+        
+        assertEquals(expected, deserialized);
+    }
+
+       @Test
+       public void basicShortUriBsTest() throws BindingSetConversionException {
+               final QueryBindingSet bs = new QueryBindingSet();
+               bs.addBinding("X",new URIImpl("http://uri1";));
+               bs.addBinding("Y",new URIImpl("http://uri2";));
+               final VariableOrder varOrder = new VariableOrder("X","Y");
+
+               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
+               final byte[] byteVal = converter.convert(bs, varOrder);
+               final BindingSet newBs = converter.convert(byteVal, varOrder);
+               assertEquals(bs, newBs);
+       }
+
+       @Test
+       public void basicLongUriBsTest() throws BindingSetConversionException {
+               final QueryBindingSet bs = new QueryBindingSet();
+               bs.addBinding("X",new URIImpl("http://uri1";));
+               bs.addBinding("Y",new URIImpl("http://uri2";));
+               bs.addBinding("Z",new URIImpl("http://uri3";));
+               bs.addBinding("A",new URIImpl("http://uri4";));
+               bs.addBinding("B",new URIImpl("http://uri5";));
+               final VariableOrder varOrder = new 
VariableOrder("X","Y","Z","A","B");
+
+               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
+               final byte[] byteVal = converter.convert(bs, varOrder);
+               final BindingSet newBs = converter.convert(byteVal, varOrder);
+               assertEquals(bs, newBs);
+       }
+
+       @Test
+       public void basicShortStringLiteralBsTest() throws 
BindingSetConversionException {
+               final QueryBindingSet bs = new QueryBindingSet();
+               bs.addBinding("X",new LiteralImpl("literal1"));
+               bs.addBinding("Y",new LiteralImpl("literal2"));
+               final VariableOrder varOrder = new VariableOrder("X","Y");
+
+               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
+               final byte[] byteVal = converter.convert(bs, varOrder);
+               final BindingSet newBs = converter.convert(byteVal, varOrder);
+               assertEquals(bs, newBs);
+       }
+
+       @Test
+       public void basicShortMixLiteralBsTest() throws 
BindingSetConversionException {
+               final QueryBindingSet bs = new QueryBindingSet();
+               bs.addBinding("X",new LiteralImpl("literal1"));
+               bs.addBinding("Y",new LiteralImpl("5", new 
URIImpl("http://www.w3.org/2001/XMLSchema#integer";)));
+               final VariableOrder varOrder = new VariableOrder("X","Y");
+
+               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
+               final byte[] byteVal = converter.convert(bs, varOrder);
+               final BindingSet newBs = converter.convert(byteVal, varOrder);
+               assertEquals(bs, newBs);
+       }
+
+       @Test
+       public void basicLongMixLiteralBsTest() throws 
BindingSetConversionException {
+               final QueryBindingSet bs = new QueryBindingSet();
+               bs.addBinding("X",new LiteralImpl("literal1"));
+               bs.addBinding("Y",new LiteralImpl("5", new 
URIImpl("http://www.w3.org/2001/XMLSchema#integer";)));
+               bs.addBinding("Z",new LiteralImpl("5.0", new 
URIImpl("http://www.w3.org/2001/XMLSchema#double";)));
+               bs.addBinding("W",new LiteralImpl("1000", new 
URIImpl("http://www.w3.org/2001/XMLSchema#long";)));
+               final VariableOrder varOrder = new 
VariableOrder("W","X","Y","Z");
+
+               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
+               final byte[] byteVal = converter.convert(bs, varOrder);
+               final BindingSet newBs = converter.convert(byteVal, varOrder);
+               assertEquals(bs, newBs);
+       }
+
+       @Test
+       public void basicMixUriLiteralBsTest() throws 
BindingSetConversionException {
+               final QueryBindingSet bs = new QueryBindingSet();
+               bs.addBinding("X",new LiteralImpl("literal1"));
+               bs.addBinding("Y",new LiteralImpl("5", new 
URIImpl("http://www.w3.org/2001/XMLSchema#integer";)));
+               bs.addBinding("Z",new LiteralImpl("5.0", new 
URIImpl("http://www.w3.org/2001/XMLSchema#double";)));
+               bs.addBinding("W",new LiteralImpl("1000", new 
URIImpl("http://www.w3.org/2001/XMLSchema#long";)));
+               bs.addBinding("A",new URIImpl("http://uri1";));
+               bs.addBinding("B",new URIImpl("http://uri2";));
+               bs.addBinding("C",new URIImpl("http://uri3";));
+               final VariableOrder varOrder = new 
VariableOrder("A","W","X","Y","Z","B","C");
+
+               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
+               final byte[] byteVal = converter.convert(bs, varOrder);
+               final BindingSet newBs = converter.convert(byteVal, varOrder);
+               assertEquals(bs, newBs);
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerialzerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerialzerTest.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerialzerTest.java
deleted file mode 100644
index d69205e..0000000
--- 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerialzerTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-package org.apache.rya.indexing.pcj.storage.accumulo;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.junit.Test;
-import org.openrdf.model.impl.LiteralImpl;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.algebra.evaluation.QueryBindingSet;
-import org.openrdf.query.impl.MapBindingSet;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.rya.api.resolver.RyaTypeResolverException;
-
-/**
- * Tests the methods of {@link AccumuloPcjSerialzer}.
- */
-public class AccumuloPcjSerialzerTest {
-
-    /**
-     * The BindingSet has fewer Bindings than there are variables in the 
variable
-     * order, but they are all in the variable order. This is the case where
-     * the missing bindings were optional.
-     */
-    @Test
-    public void serialize_bindingsSubsetOfVarOrder() throws 
BindingSetConversionException {
-        // Setup the Binding Set.
-        final MapBindingSet originalBindingSet = new MapBindingSet();
-        originalBindingSet.addBinding("x", new URIImpl("http://a";));
-        originalBindingSet.addBinding("y", new URIImpl("http://b";));
-
-        // Setup the variable order.
-        final VariableOrder varOrder = new VariableOrder("x", "a", "y", "b");
-
-        // Create the byte[] representation of the BindingSet.
-        BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
-        byte[] serialized = converter.convert(originalBindingSet, varOrder);
-
-        // Deserialize the byte[] back into the binding set.
-        BindingSet deserialized = converter.convert(serialized, varOrder);
-
-        // Ensure the deserialized value matches the original.
-        assertEquals(originalBindingSet, deserialized);
-    }
-
-    /**
-     * The BindingSet has a Binding whose name is not in the variable order.
-     * This is illegal.
-     */
-    @Test(expected = IllegalArgumentException.class)
-    public void serialize_bindingNotInVariableOrder() throws 
RyaTypeResolverException, BindingSetConversionException {
-        // Setup the Binding Set.
-        final MapBindingSet originalBindingSet = new MapBindingSet();
-        originalBindingSet.addBinding("x", new URIImpl("http://a";));
-        originalBindingSet.addBinding("y", new URIImpl("http://b";));
-        originalBindingSet.addBinding("z", new URIImpl("http://d";));
-
-        // Setup the variable order.
-        final VariableOrder varOrder = new VariableOrder("x", "y");
-
-        // Create the byte[] representation of the BindingSet. This will throw 
an exception.
-        BindingSetConverter<byte[]> converter = new AccumuloPcjSerializer();
-        converter.convert(originalBindingSet, varOrder);
-    }
-
-       @Test
-       public void basicShortUriBsTest() throws BindingSetConversionException {
-               final QueryBindingSet bs = new QueryBindingSet();
-               bs.addBinding("X",new URIImpl("http://uri1";));
-               bs.addBinding("Y",new URIImpl("http://uri2";));
-               final VariableOrder varOrder = new VariableOrder("X","Y");
-
-               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
-               final byte[] byteVal = converter.convert(bs, varOrder);
-               final BindingSet newBs = converter.convert(byteVal, varOrder);
-               assertEquals(bs, newBs);
-       }
-
-       @Test
-       public void basicLongUriBsTest() throws BindingSetConversionException {
-               final QueryBindingSet bs = new QueryBindingSet();
-               bs.addBinding("X",new URIImpl("http://uri1";));
-               bs.addBinding("Y",new URIImpl("http://uri2";));
-               bs.addBinding("Z",new URIImpl("http://uri3";));
-               bs.addBinding("A",new URIImpl("http://uri4";));
-               bs.addBinding("B",new URIImpl("http://uri5";));
-               final VariableOrder varOrder = new 
VariableOrder("X","Y","Z","A","B");
-
-               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
-               final byte[] byteVal = converter.convert(bs, varOrder);
-               final BindingSet newBs = converter.convert(byteVal, varOrder);
-               assertEquals(bs, newBs);
-       }
-
-       @Test
-       public void basicShortStringLiteralBsTest() throws 
BindingSetConversionException {
-               final QueryBindingSet bs = new QueryBindingSet();
-               bs.addBinding("X",new LiteralImpl("literal1"));
-               bs.addBinding("Y",new LiteralImpl("literal2"));
-               final VariableOrder varOrder = new VariableOrder("X","Y");
-
-               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
-               final byte[] byteVal = converter.convert(bs, varOrder);
-               final BindingSet newBs = converter.convert(byteVal, varOrder);
-               assertEquals(bs, newBs);
-       }
-
-       @Test
-       public void basicShortMixLiteralBsTest() throws 
BindingSetConversionException {
-               final QueryBindingSet bs = new QueryBindingSet();
-               bs.addBinding("X",new LiteralImpl("literal1"));
-               bs.addBinding("Y",new LiteralImpl("5", new 
URIImpl("http://www.w3.org/2001/XMLSchema#integer";)));
-               final VariableOrder varOrder = new VariableOrder("X","Y");
-
-               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
-               final byte[] byteVal = converter.convert(bs, varOrder);
-               final BindingSet newBs = converter.convert(byteVal, varOrder);
-               assertEquals(bs, newBs);
-       }
-
-       @Test
-       public void basicLongMixLiteralBsTest() throws 
BindingSetConversionException {
-               final QueryBindingSet bs = new QueryBindingSet();
-               bs.addBinding("X",new LiteralImpl("literal1"));
-               bs.addBinding("Y",new LiteralImpl("5", new 
URIImpl("http://www.w3.org/2001/XMLSchema#integer";)));
-               bs.addBinding("Z",new LiteralImpl("5.0", new 
URIImpl("http://www.w3.org/2001/XMLSchema#double";)));
-               bs.addBinding("W",new LiteralImpl("1000", new 
URIImpl("http://www.w3.org/2001/XMLSchema#long";)));
-               final VariableOrder varOrder = new 
VariableOrder("W","X","Y","Z");
-
-               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
-               final byte[] byteVal = converter.convert(bs, varOrder);
-               final BindingSet newBs = converter.convert(byteVal, varOrder);
-               assertEquals(bs, newBs);
-       }
-
-       @Test
-       public void basicMixUriLiteralBsTest() throws 
BindingSetConversionException {
-               final QueryBindingSet bs = new QueryBindingSet();
-               bs.addBinding("X",new LiteralImpl("literal1"));
-               bs.addBinding("Y",new LiteralImpl("5", new 
URIImpl("http://www.w3.org/2001/XMLSchema#integer";)));
-               bs.addBinding("Z",new LiteralImpl("5.0", new 
URIImpl("http://www.w3.org/2001/XMLSchema#double";)));
-               bs.addBinding("W",new LiteralImpl("1000", new 
URIImpl("http://www.w3.org/2001/XMLSchema#long";)));
-               bs.addBinding("A",new URIImpl("http://uri1";));
-               bs.addBinding("B",new URIImpl("http://uri2";));
-               bs.addBinding("C",new URIImpl("http://uri3";));
-               final VariableOrder varOrder = new 
VariableOrder("A","W","X","Y","Z","B","C");
-
-               BindingSetConverter<byte[]> converter = new 
AccumuloPcjSerializer();
-               final byte[] byteVal = converter.convert(bs, varOrder);
-               final BindingSet newBs = converter.convert(byteVal, varOrder);
-               assertEquals(bs, newBs);
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java
index e01e7de..b263038 100644
--- 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverterTest.java
@@ -23,10 +23,7 @@ import static org.junit.Assert.assertEquals;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
-import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.junit.Test;
 import org.openrdf.model.impl.BooleanLiteralImpl;
 import org.openrdf.model.impl.DecimalLiteralImpl;
@@ -41,6 +38,23 @@ import org.openrdf.query.impl.MapBindingSet;
 public class BindingSetStringConverterTest {
 
     @Test
+    public void noBindings() throws BindingSetConversionException {
+        // Create a BindingSet that doesn't have any bindings.
+        final MapBindingSet original = new MapBindingSet();
+
+        // Convert it to a String.
+        final VariableOrder varOrder = new VariableOrder();
+        final BindingSetConverter<String> converter = new 
BindingSetStringConverter();
+        final String bindingSetString = converter.convert(original, varOrder);
+
+        // Convert it back to a binding set.
+        final BindingSet converted = converter.convert(bindingSetString, 
varOrder);
+
+        // Ensure it is still an empty BindingSet.
+        assertEquals(original, converted);
+    }
+
+    @Test
     public void toString_URIs() throws BindingSetConversionException {
         // Setup the binding set that will be converted.
         final MapBindingSet originalBindingSet = new MapBindingSet();
@@ -53,7 +67,7 @@ public class BindingSetStringConverterTest {
         final BindingSetConverter<String> converter = new 
BindingSetStringConverter();
         final String bindingSetString = converter.convert(originalBindingSet, 
varOrder);
 
-        // Ensure it converted to the expected result.l
+        // Ensure it converted to the expected result.
         final String expected =
                 "http://b<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
                 "http://c<<~>>http://www.w3.org/2001/XMLSchema#anyURI:::"; +
@@ -163,26 +177,6 @@ public class BindingSetStringConverterTest {
         assertEquals(expected, bindingSetString);
     }
 
-    /**
-     * The BindingSet has a Binding whose name is not in the variable order.
-     * This is illegal.
-     */
-    @Test(expected = IllegalArgumentException.class)
-    public void toString_bindingNotInVariableOrder() throws 
BindingSetConversionException {
-        // Setup the Binding Set.
-        final MapBindingSet originalBindingSet = new MapBindingSet();
-        originalBindingSet.addBinding("x", new URIImpl("http://a";));
-        originalBindingSet.addBinding("y", new URIImpl("http://b";));
-        originalBindingSet.addBinding("z", new URIImpl("http://d";));
-
-        // Setup the variable order.
-        final VariableOrder varOrder = new VariableOrder("x", "y");
-
-        // Create the String representation of the BindingSet. This will throw 
an exception.
-        final BindingSetConverter<String> converter = new 
BindingSetStringConverter();
-        converter.convert(originalBindingSet, varOrder);
-    }
-
     @Test
     public void fromString() throws BindingSetConversionException {
         // Setup the String that will be converted.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
index 623526e..2bcce65 100644
--- 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java
@@ -38,10 +38,17 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
 import org.apache.zookeeper.ClientCnxn;
 import org.junit.After;
 import org.junit.Before;
@@ -63,13 +70,6 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.AccumuloRyaDAO;
-import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
-import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
-import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
-import org.apache.rya.rdftriplestore.RyaSailRepository;
-
 /**
  * Performs integration test using {@link MiniAccumuloCluster} to ensure the
  * functions of {@link PcjTables} work within a cluster setting.
@@ -237,7 +237,7 @@ public class PcjTablesIntegrationTest {
     }
 
     @Test
-    public void listResults() throws PCJStorageException, AccumuloException, 
AccumuloSecurityException {
+    public void listResults() throws Exception {
         final String sparql =
                 "SELECT ?name ?age " +
                 "{" +
@@ -274,8 +274,14 @@ public class PcjTablesIntegrationTest {
 
         // Fetch the Binding Sets that have been stored in the PCJ table.
         final Set<BindingSet> results = new HashSet<>();
-        for(final BindingSet result : pcjs.listResults(accumuloConn, 
pcjTableName, new Authorizations())) {
-            results.add( result );
+
+        final CloseableIterator<BindingSet> resultsIt = 
pcjs.listResults(accumuloConn, pcjTableName, new Authorizations());
+        try {
+            while(resultsIt.hasNext()) {
+                results.add( resultsIt.next() );
+            }
+        } finally {
+            resultsIt.close();
         }
 
         // Verify the fetched results match the expected ones.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
index 7eeff1d..98ed4c7 100644
--- 
a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
+++ 
b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
@@ -31,8 +31,15 @@ import java.util.Set;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.accumulo.AccumuloRyaITBase;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
 import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
@@ -46,13 +53,6 @@ import org.openrdf.query.impl.MapBindingSet;
 
 import com.google.common.collect.ImmutableMap;
 
-import org.apache.rya.accumulo.AccumuloRyaITBase;
-import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-import org.apache.rya.api.instance.RyaDetailsRepository;
-import 
org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
-import 
org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-
 /**
  * Integration tests the methods of {@link AccumuloPcjStorage}.
  * </p>
@@ -66,23 +66,23 @@ public class AccumuloPcjStorageIT extends AccumuloRyaITBase 
{
         // Setup the PCJ storage that will be tested against.
         final Connector connector = super.getClusterInstance().getConnector();
         final String ryaInstanceName = super.getRyaInstanceName();
-        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
-
-        // Create a PCJ.
-        final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a 
<http://isA> ?b } ");
+        try(final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a 
<http://isA> ?b } ");
 
-        // Ensure the Rya details have been updated to include the PCJ's ID.
-        final RyaDetailsRepository detailsRepo = new 
AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
+            // Ensure the Rya details have been updated to include the PCJ's 
ID.
+            final RyaDetailsRepository detailsRepo = new 
AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
 
-        final ImmutableMap<String, PCJDetails> detailsMap = 
detailsRepo.getRyaInstanceDetails()
-                .getPCJIndexDetails()
-                .getPCJDetails();
+            final ImmutableMap<String, PCJDetails> detailsMap = 
detailsRepo.getRyaInstanceDetails()
+                    .getPCJIndexDetails()
+                    .getPCJDetails();
 
-        final PCJDetails expectedDetails = PCJDetails.builder()
-                .setId( pcjId )
-                .build();
+            final PCJDetails expectedDetails = PCJDetails.builder()
+                    .setId( pcjId )
+                    .build();
 
-        assertEquals(expectedDetails, detailsMap.get(pcjId));
+            assertEquals(expectedDetails, detailsMap.get(pcjId));
+        }
     }
 
     @Test
@@ -90,22 +90,22 @@ public class AccumuloPcjStorageIT extends AccumuloRyaITBase 
{
         // Setup the PCJ storage that will be tested against.
         final Connector connector = super.getClusterInstance().getConnector();
         final String ryaInstanceName = super.getRyaInstanceName();
-        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
-
-        // Create a PCJ.
-        final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a 
<http://isA> ?b } ");
+        try(final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a 
<http://isA> ?b } ");
 
-        // Delete the PCJ that was just created.
-        pcjStorage.dropPcj(pcjId);
+            // Delete the PCJ that was just created.
+            pcjStorage.dropPcj(pcjId);
 
-        // Ensure the Rya details have been updated to no longer include the 
PCJ's ID.
-        final RyaDetailsRepository detailsRepo = new 
AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
+            // Ensure the Rya details have been updated to no longer include 
the PCJ's ID.
+            final RyaDetailsRepository detailsRepo = new 
AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
 
-        final ImmutableMap<String, PCJDetails> detailsMap = 
detailsRepo.getRyaInstanceDetails()
-                .getPCJIndexDetails()
-                .getPCJDetails();
+            final ImmutableMap<String, PCJDetails> detailsMap = 
detailsRepo.getRyaInstanceDetails()
+                    .getPCJIndexDetails()
+                    .getPCJDetails();
 
-        assertFalse( detailsMap.containsKey(pcjId) );
+            assertFalse( detailsMap.containsKey(pcjId) );
+        }
     }
 
     @Test
@@ -113,27 +113,27 @@ public class AccumuloPcjStorageIT extends 
AccumuloRyaITBase {
         // Setup the PCJ storage that will be tested against.
         final Connector connector = super.getClusterInstance().getConnector();
         final String ryaInstanceName = super.getRyaInstanceName();
-        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
-
-        // Create a few PCJs and hold onto their IDs.
-        final List<String> expectedIds = new ArrayList<>();
+        try(final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a few PCJs and hold onto their IDs.
+            final List<String> expectedIds = new ArrayList<>();
 
-        String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> 
?b } ");
-        expectedIds.add( pcjId );
+            String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a 
<http://isA> ?b } ");
+            expectedIds.add( pcjId );
 
-        pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-        expectedIds.add( pcjId );
+            pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b 
} ");
+            expectedIds.add( pcjId );
 
-        pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-        expectedIds.add( pcjId );
+            pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b 
} ");
+            expectedIds.add( pcjId );
 
-        // Fetch the PCJ names
-        final List<String> pcjIds = pcjStorage.listPcjs();
+            // Fetch the PCJ names
+            final List<String> pcjIds = pcjStorage.listPcjs();
 
-        // Ensure the expected IDs match the fetched IDs.
-        Collections.sort(expectedIds);
-        Collections.sort(pcjIds);
-        assertEquals(expectedIds, pcjIds);
+            // Ensure the expected IDs match the fetched IDs.
+            Collections.sort(expectedIds);
+            Collections.sort(pcjIds);
+            assertEquals(expectedIds, pcjIds);
+        }
     }
 
     @Test
@@ -141,19 +141,19 @@ public class AccumuloPcjStorageIT extends 
AccumuloRyaITBase {
         // Setup the PCJ storage that will be tested against.
         final Connector connector = super.getClusterInstance().getConnector();
         final String ryaInstanceName = super.getRyaInstanceName();
-        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
-
-        // Create a PCJ.
-        final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
-        final String pcjId = pcjStorage.createPcj(sparql);
-
-        // Fetch the PCJ's metadata.
-        final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-
-        // Ensure it has the expected values.
-        final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
-        final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, 
varOrders);
-        assertEquals(expectedMetadata, metadata);
+        try(final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Fetch the PCJ's metadata.
+            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+            // Ensure it has the expected values.
+            final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
+            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, 
varOrders);
+            assertEquals(expectedMetadata, metadata);
+        }
     }
 
     @Test
@@ -161,112 +161,116 @@ public class AccumuloPcjStorageIT extends 
AccumuloRyaITBase {
         // Setup the PCJ storage that will be tested against.
         final Connector connector = super.getClusterInstance().getConnector();
         final String ryaInstanceName = super.getRyaInstanceName();
-        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
+        try(final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+            final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Create a PCJ.
-        final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
-        final String pcjId = pcjStorage.createPcj(sparql);
+            // Add some binding sets to it.
+            final Set<VisibilityBindingSet> results = new HashSet<>();
 
-        // Add some binding sets to it.
-        final Set<VisibilityBindingSet> results = new HashSet<>();
+            final MapBindingSet aliceBS = new MapBindingSet();
+            aliceBS.addBinding("a", new URIImpl("http://Alice";));
+            aliceBS.addBinding("b", new URIImpl("http://Person";));
+            results.add( new VisibilityBindingSet(aliceBS, "") );
 
-        final MapBindingSet aliceBS = new MapBindingSet();
-        aliceBS.addBinding("a", new URIImpl("http://Alice";));
-        aliceBS.addBinding("b", new URIImpl("http://Person";));
-        results.add( new VisibilityBindingSet(aliceBS, "") );
+            final MapBindingSet charlieBS = new MapBindingSet();
+            charlieBS.addBinding("a", new URIImpl("http://Charlie";));
+            charlieBS.addBinding("b", new URIImpl("http://Comedian";));
+            results.add( new VisibilityBindingSet(charlieBS, "") );
 
-        final MapBindingSet charlieBS = new MapBindingSet();
-        charlieBS.addBinding("a", new URIImpl("http://Charlie";));
-        charlieBS.addBinding("b", new URIImpl("http://Comedian";));
-        results.add( new VisibilityBindingSet(charlieBS, "") );
+            pcjStorage.addResults(pcjId, results);
 
-        pcjStorage.addResults(pcjId, results);
+            // Make sure the PCJ metadata was updated.
+            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
 
-        // Make sure the PCJ metadata was updated.
-        final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-
-        final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
-        final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, 
varOrders);
-        assertEquals(expectedMetadata, metadata);
+            final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
+            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, 
varOrders);
+            assertEquals(expectedMetadata, metadata);
+        }
     }
 
     @Test
-    public void listResults() throws AccumuloException, 
AccumuloSecurityException, PCJStorageException {
+    public void listResults() throws Exception {
         // Setup the PCJ storage that will be tested against.
         final Connector connector = super.getClusterInstance().getConnector();
         final String ryaInstanceName = super.getRyaInstanceName();
-        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
-
-        // Create a PCJ.
-        final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
-        final String pcjId = pcjStorage.createPcj(sparql);
-
-        // Add some binding sets to it.
-        final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
-
-        final MapBindingSet aliceBS = new MapBindingSet();
-        aliceBS.addBinding("a", new URIImpl("http://Alice";));
-        aliceBS.addBinding("b", new URIImpl("http://Person";));
-        expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
-
-        final MapBindingSet charlieBS = new MapBindingSet();
-        charlieBS.addBinding("a", new URIImpl("http://Charlie";));
-        charlieBS.addBinding("b", new URIImpl("http://Comedian";));
-        expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
-
-        pcjStorage.addResults(pcjId, expectedResults);
-
-        // List the results that were stored.
-        final Set<BindingSet> results = new HashSet<>();
-        for(final BindingSet result : pcjStorage.listResults(pcjId)) {
-            results.add( result );
+        try(final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Add some binding sets to it.
+            final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+            final MapBindingSet aliceBS = new MapBindingSet();
+            aliceBS.addBinding("a", new URIImpl("http://Alice";));
+            aliceBS.addBinding("b", new URIImpl("http://Person";));
+            expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+
+            final MapBindingSet charlieBS = new MapBindingSet();
+            charlieBS.addBinding("a", new URIImpl("http://Charlie";));
+            charlieBS.addBinding("b", new URIImpl("http://Comedian";));
+            expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+
+            pcjStorage.addResults(pcjId, expectedResults);
+
+            // List the results that were stored.
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
+
+            assertEquals(expectedResults, results);
         }
-
-        assertEquals(expectedResults, results);
     }
 
     @Test
-    public void purge() throws AccumuloException, AccumuloSecurityException, 
PCJStorageException, MalformedQueryException {
+    public void purge() throws Exception {
         // Setup the PCJ storage that will be tested against.
         final Connector connector = super.getClusterInstance().getConnector();
         final String ryaInstanceName = super.getRyaInstanceName();
-        final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName);
-
-        // Create a PCJ.
-        final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
-        final String pcjId = pcjStorage.createPcj(sparql);
+        try(final PrecomputedJoinStorage pcjStorage =  new 
AccumuloPcjStorage(connector, ryaInstanceName)) {
+            // Create a PCJ.
+            final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+            final String pcjId = pcjStorage.createPcj(sparql);
 
-        // Add some binding sets to it.
-        final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+            // Add some binding sets to it.
+            final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
 
-        final MapBindingSet aliceBS = new MapBindingSet();
-        aliceBS.addBinding("a", new URIImpl("http://Alice";));
-        aliceBS.addBinding("b", new URIImpl("http://Person";));
-        expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+            final MapBindingSet aliceBS = new MapBindingSet();
+            aliceBS.addBinding("a", new URIImpl("http://Alice";));
+            aliceBS.addBinding("b", new URIImpl("http://Person";));
+            expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
 
-        final MapBindingSet charlieBS = new MapBindingSet();
-        charlieBS.addBinding("a", new URIImpl("http://Charlie";));
-        charlieBS.addBinding("b", new URIImpl("http://Comedian";));
-        expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+            final MapBindingSet charlieBS = new MapBindingSet();
+            charlieBS.addBinding("a", new URIImpl("http://Charlie";));
+            charlieBS.addBinding("b", new URIImpl("http://Comedian";));
+            expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
 
-        pcjStorage.addResults(pcjId, expectedResults);
+            pcjStorage.addResults(pcjId, expectedResults);
 
-        // Purge the PCJ.
-        pcjStorage.purge(pcjId);
+            // Purge the PCJ.
+            pcjStorage.purge(pcjId);
 
-        // List the results that were stored.
-        final Set<BindingSet> results = new HashSet<>();
-        for(final BindingSet result : pcjStorage.listResults(pcjId)) {
-            results.add( result );
-        }
+            // List the results that were stored.
+            final Set<BindingSet> results = new HashSet<>();
+            try(CloseableIterator<BindingSet> resultsIt = 
pcjStorage.listResults(pcjId)) {
+                while(resultsIt.hasNext()) {
+                    results.add( resultsIt.next() );
+                }
+            }
 
-        assertTrue( results.isEmpty() );
+            assertTrue( results.isEmpty() );
 
-        // Make sure the PCJ metadata was updated.
-        final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+            // Make sure the PCJ metadata was updated.
+            final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
 
-        final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
-        final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, 
varOrders);
-        assertEquals(expectedMetadata, metadata);
+            final Set<VariableOrder> varOrders = new 
ShiftVarOrderFactory().makeVarOrders(sparql);
+            final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, 
varOrders);
+            assertEquals(expectedMetadata, metadata);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index 86f5b48..1de0813 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -22,9 +22,9 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Objects.requireNonNull;
 
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.fluo.api.client.FluoClient;
 import org.apache.fluo.api.client.Transaction;
+import org.apache.log4j.Logger;
 import org.apache.rya.accumulo.AccumuloRdfConfiguration;
 import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
 import org.apache.rya.api.domain.RyaStatement;
@@ -52,15 +53,14 @@ import 
org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
 import org.apache.rya.indexing.pcj.storage.PcjException;
 import org.apache.rya.indexing.pcj.storage.PcjMetadata;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.calrissian.mango.collect.CloseableIterable;
 import org.openrdf.model.Resource;
 import org.openrdf.model.URI;
 import org.openrdf.model.Value;
 import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.QueryEvaluationException;
 import org.openrdf.query.algebra.StatementPattern;
 import org.openrdf.query.parser.ParsedQuery;
 import org.openrdf.query.parser.sparql.SPARQLParser;
-import org.openrdf.sail.SailException;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -82,6 +82,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  */
 @DefaultAnnotation(NonNull.class)
 public class CreatePcj {
+    private static final Logger log = Logger.getLogger(CreatePcj.class);
 
     /**
      * The default Statement Pattern batch insert size is 1000.
@@ -113,7 +114,51 @@ public class CreatePcj {
         this.spInsertBatchSize = spInsertBatchSize;
     }
 
-    
+    /**
+     * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+     *
+     * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. 
(not null)
+     * @param pcjStorage - Provides access to the PCJ index. (not null)
+     * @param fluo - A connection to the Fluo application that updates the PCJ 
index. (not null)
+     * @return The metadata that was written to the Fluo application for the 
PCJ.
+     * @throws MalformedQueryException The SPARQL query stored for the {@code 
pcjId} is malformed.
+     * @throws PcjException The PCJ Metadata for {@code pcjId} could not be 
read from {@code pcjStorage}.
+     */
+    public FluoQuery createPcj(
+            final String pcjId,
+            final PrecomputedJoinStorage pcjStorage,
+            final FluoClient fluo) throws MalformedQueryException, 
PcjException {
+        requireNonNull(pcjId);
+        requireNonNull(pcjStorage);
+        requireNonNull(fluo);
+
+        // Keeps track of the IDs that are assigned to each of the query's 
nodes in Fluo.
+        // We use these IDs later when scanning Rya for historic Statement 
Pattern matches
+        // as well as setting up automatic exports.
+        final NodeIds nodeIds = new NodeIds();
+
+        // Parse the query's structure for the metadata that will be written 
to fluo.
+        final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+        final String sparql = pcjMetadata.getSparql();
+        final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, 
null);
+        final FluoQuery fluoQuery = new 
SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
+
+        try (Transaction tx = fluo.newTransaction()) {
+            // Write the query's structure to Fluo.
+            new FluoQueryMetadataDAO().write(tx, fluoQuery);
+
+            // The results of the query are eventually exported to an instance 
of Rya, so store the Rya ID for the PCJ.
+            final String queryId = fluoQuery.getQueryMetadata().getNodeId();
+            tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
+            tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
+
+            // Flush the changes to Fluo.
+            tx.commit();
+        }
+
+        return fluoQuery;
+    }
+
     /**
      * Tells the Fluo PCJ Updater application to maintain a new PCJ.
      * <p>
@@ -126,147 +171,115 @@ public class CreatePcj {
      * @param pcjStorage - Provides access to the PCJ index. (not null)
      * @param fluo - A connection to the Fluo application that updates the PCJ 
index. (not null)
      * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
-     *
+     * @return The Fluo application's Query ID of the query that was created.
      * @throws MalformedQueryException The SPARQL query stored for the {@code 
pcjId} is malformed.
      * @throws PcjException The PCJ Metadata for {@code pcjId} could not be 
read from {@code pcjStorage}.
-     * @throws SailException Historic PCJ results could not be loaded because 
of a problem with {@code rya}.
-     * @throws QueryEvaluationException Historic PCJ results could not be 
loaded because of a problem with {@code rya}.
+     * @throws RyaDAOException Historic PCJ results could not be loaded 
because of a problem with {@code rya}.
      */
-    public String  withRyaIntegration(final String pcjId, final 
PrecomputedJoinStorage pcjStorage, final FluoClient fluo,
-            final Connector accumulo, String ryaInstance )
-                    throws MalformedQueryException, PcjException, 
SailException, QueryEvaluationException, RyaDAOException {
+    public String withRyaIntegration(
+            final String pcjId,
+            final PrecomputedJoinStorage pcjStorage,
+            final FluoClient fluo,
+            final Connector accumulo,
+            final String ryaInstance ) throws MalformedQueryException, 
PcjException, RyaDAOException {
         requireNonNull(pcjId);
         requireNonNull(pcjStorage);
         requireNonNull(fluo);
-               requireNonNull(accumulo);
-               requireNonNull(ryaInstance);
-               
-               //Create AccumuloRyaQueryEngine to query for historic results
-               AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-               conf.setTablePrefix(ryaInstance);
-               conf.setAuths(getAuths(accumulo));
-               AccumuloRyaQueryEngine queryEngine = new 
AccumuloRyaQueryEngine(accumulo, conf);
-               
-
-               // Keeps track of the IDs that are assigned to each of the 
query's nodes
-               // in Fluo.
-               // We use these IDs later when scanning Rya for historic 
Statement
-               // Pattern matches
-               // as well as setting up automatic exports.
-               final NodeIds nodeIds = new NodeIds();
-
-               // Parse the query's structure for the metadata that will be 
written to
-               // fluo.
-               final PcjMetadata pcjMetadata = 
pcjStorage.getPcjMetadata(pcjId);
-               final String sparql = pcjMetadata.getSparql();
-               final ParsedQuery parsedQuery = new 
SPARQLParser().parseQuery(sparql, null);
-               final FluoQuery fluoQuery = new 
SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
+        requireNonNull(accumulo);
+        requireNonNull(ryaInstance);
+
+        // Write the SPARQL query's structure to the Fluo Application.
+        final FluoQuery fluoQuery = createPcj(pcjId, pcjStorage, fluo);
+
+        // Reuse the same set object while performing batch inserts.
+        final Set<RyaStatement> queryBatch = new HashSet<>();
+
+        // Iterate through each of the statement patterns and insert their 
historic matches into Fluo.
+        for (final StatementPatternMetadata patternMetadata : 
fluoQuery.getStatementPatternMetadata()) {
+            // Get an iterator over all of the binding sets that match the 
statement pattern.
+            final StatementPattern pattern = 
FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern());
+            queryBatch.add(spToRyaStatement(pattern));
+        }
+
+        //Create AccumuloRyaQueryEngine to query for historic results
+        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix(ryaInstance);
+        conf.setAuths(getAuths(accumulo));
+
+        try(final AccumuloRyaQueryEngine queryEngine = new 
AccumuloRyaQueryEngine(accumulo, conf);
+                CloseableIterable<RyaStatement> queryIterable = 
queryEngine.query(new BatchRyaQuery(queryBatch))) {
+            final Set<RyaStatement> triplesBatch = new HashSet<>();
+
+            // Insert batches of the binding sets into Fluo.
+            for(final RyaStatement ryaStatement : queryIterable) {
+                if (triplesBatch.size() == spInsertBatchSize) {
+                    writeBatch(fluo, triplesBatch);
+                    triplesBatch.clear();
+                }
+
+                triplesBatch.add(ryaStatement);
+            }
+
+            if (!triplesBatch.isEmpty()) {
+                writeBatch(fluo, triplesBatch);
+                triplesBatch.clear();
+            }
+        } catch (final IOException e) {
+            log.warn("Ignoring IOException thrown while closing the 
AccumuloRyaQueryEngine used by CreatePCJ.", e);
+        }
 
         // return queryId to the caller for later monitoring from the export.
-        String queryId = null;
-
-               try (Transaction tx = fluo.newTransaction()) {
-                       // Write the query's structure to Fluo.
-                       new FluoQueryMetadataDAO().write(tx, fluoQuery);
-
-                       // The results of the query are eventually exported to 
an instance
-                       // of Rya, so store the Rya ID for the PCJ.
-            queryId = fluoQuery.getQueryMetadata().getNodeId();
-                       tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
-                       tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, 
queryId);
-
-                       // Flush the changes to Fluo.
-                       tx.commit();
-               }
-
-               // Reuse the same set object while performing batch inserts.
-               final Set<RyaStatement> queryBatch = new HashSet<>();
-
-               // Iterate through each of the statement patterns and insert 
their
-               // historic matches into Fluo.
-               for (final StatementPatternMetadata patternMetadata : 
fluoQuery.getStatementPatternMetadata()) {
-                       // Get an iterator over all of the binding sets that 
match the
-                       // statement pattern.
-                       final StatementPattern pattern = FluoStringConverter
-                                       
.toStatementPattern(patternMetadata.getStatementPattern());
-                       queryBatch.add(spToRyaStatement(pattern));
-               }
-
-               Iterator<RyaStatement> triples = queryEngine.query(new 
BatchRyaQuery(queryBatch)).iterator();
-               Set<RyaStatement> triplesBatch = new HashSet<>();
-
-               // Insert batches of the binding sets into Fluo.
-               while (triples.hasNext()) {
-                       if (triplesBatch.size() == spInsertBatchSize) {
-                               writeBatch(fluo, triplesBatch);
-                               triplesBatch.clear();
-                       }
-
-                       triplesBatch.add(triples.next());
-               }
-
-               if (!triplesBatch.isEmpty()) {
-                       writeBatch(fluo, triplesBatch);
-                       triplesBatch.clear();
-               }
-        return queryId;
+        return fluoQuery.getQueryMetadata().getNodeId();
     }
-    
+
     private static void writeBatch(final FluoClient fluo, final 
Set<RyaStatement> batch) {
         checkNotNull(fluo);
         checkNotNull(batch);
-        
         new InsertTriples().insert(fluo, batch);
+    }
+
+    private static RyaStatement spToRyaStatement(final StatementPattern sp) {
+        final Value subjVal = sp.getSubjectVar().getValue();
+        final Value predVal = sp.getPredicateVar().getValue();
+        final Value objVal = sp.getObjectVar().getValue();
+
+        RyaURI subjURI = null;
+        RyaURI predURI = null;
+        RyaType objType = null;
 
+        if(subjVal != null) {
+            if(!(subjVal instanceof Resource)) {
+                throw new AssertionError("Subject must be a Resource.");
+            }
+            subjURI = RdfToRyaConversions.convertResource((Resource) subjVal);
+        }
+
+        if (predVal != null) {
+            if(!(predVal instanceof URI)) {
+                throw new AssertionError("Predicate must be a URI.");
+            }
+            predURI = RdfToRyaConversions.convertURI((URI) predVal);
+        }
+
+        if (objVal != null ) {
+            objType = RdfToRyaConversions.convertValue(objVal);
+        }
+
+        return new RyaStatement(subjURI, predURI, objType);
     }
-    
-    
-    private static RyaStatement spToRyaStatement(StatementPattern sp) {
-    
-       Value subjVal = sp.getSubjectVar().getValue();
-       Value predVal = sp.getPredicateVar().getValue();
-       Value objVal = sp.getObjectVar().getValue();
-       
-       RyaURI subjURI = null;
-       RyaURI predURI = null;
-       RyaType objType = null;
-       
-       if(subjVal != null) {
-               if(!(subjVal instanceof Resource)) {
-                       throw new AssertionError("Subject must be a Resource.");
-               }
-               subjURI = RdfToRyaConversions.convertResource((Resource) 
subjVal);
-       }
-       
-               if (predVal != null) {
-                       if(!(predVal instanceof URI)) {
-                       throw new AssertionError("Predicate must be a URI.");
-               }
-                       predURI = RdfToRyaConversions.convertURI((URI) predVal);
-               }
-               
-               if (objVal != null ) {
-                       objType = RdfToRyaConversions.convertValue(objVal);
-               }
-               
-       return new RyaStatement(subjURI, predURI, objType);
+
+    private String[] getAuths(final Connector accumulo) {
+        Authorizations auths;
+        try {
+            auths = 
accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
+            final List<byte[]> authList = auths.getAuthorizations();
+            final String[] authArray = new String[authList.size()];
+            for(int i = 0; i < authList.size(); i++){
+                authArray[i] = new String(authList.get(i), "UTF-8");
+            }
+            return authArray;
+        } catch (AccumuloException | AccumuloSecurityException | 
UnsupportedEncodingException e) {
+            throw new RuntimeException("Cannot read authorizations for user: " 
+ accumulo.whoami());
+        }
     }
-    
-    
-    private String[] getAuths(Connector accumulo) {
-        Authorizations auths;
-               try {
-                       auths = 
accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
-                       List<byte[]> authList = auths.getAuthorizations();
-                String[] authArray = new String[authList.size()];
-                for(int i = 0; i < authList.size(); i++){
-                       authArray[i] = new String(authList.get(i), "UTF-8");
-                }
-                return authArray;
-               } catch (AccumuloException | AccumuloSecurityException | 
UnsupportedEncodingException e) {
-                       throw new RuntimeException("Cannot read authorizations 
for user: " + accumulo.whoami());
-               }
-   }
-    
-    
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
index 1d92262..c11f9fb 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
@@ -25,10 +25,15 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
@@ -36,13 +41,8 @@ import 
org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
 import org.openrdf.query.BindingSet;
 
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.data.Span;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
 
 /**
  * Deletes a Pre-computed Join (PCJ) from Fluo.
@@ -154,6 +154,12 @@ public class DeletePcj {
                 nodeIds.add(filterChild);
                 getChildNodeIds(tx, filterChild, nodeIds);
                 break;
+            case AGGREGATION:
+                final AggregationMetadata aggMeta = 
dao.readAggregationMetadata(tx, nodeId);
+                final String aggChild = aggMeta.getChildNodeId();
+                nodeIds.add(aggChild);
+                getChildNodeIds(tx, aggChild, nodeIds);
+                break;
             case STATEMENT_PATTERN:
                 break;
         }
@@ -254,7 +260,7 @@ public class DeletePcj {
 
         try(Transaction ntx = tx) {
           int count = 0;
-          Iterator<RowColumnValue> iter = scanner.iterator();
+          final Iterator<RowColumnValue> iter = scanner.iterator();
           while (iter.hasNext() && count < batchSize) {
             final Bytes row = iter.next().getRow();
             count++;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index 343713c..38fff95 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -64,6 +64,14 @@ under the License.
                </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-recipes-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.fluo</groupId>
+            <artifactId>fluo-recipes-accumulo</artifactId>
+        </dependency>
         
         <dependency>
           <groupId>org.apache.kafka</groupId>
@@ -81,7 +89,7 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
-                <dependency>
+        <dependency>
             <groupId>com.esotericsoftware</groupId>
             <artifactId>kryo</artifactId>
             <version>${kryo.version}</version>

Reply via email to