http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
new file mode 100644
index 0000000..ad38b2b
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
@@ -0,0 +1,850 @@
+package mvm.rya.accumulo.documentIndex;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * This iterator facilitates document-partitioned indexing. It involves 
grouping a set of documents together and indexing those documents into a single 
row of
+ * an Accumulo table. This allows a tablet server to perform boolean AND 
operations on terms in the index.
+ * 
+ * The table structure should have the following form:
+ * 
+ * row: shardID, colfam: term, colqual: docID
+ * 
+ * When you configure this iterator with a set of terms (column families), it 
will return only the docIDs that appear with all of the specified terms. The
+ * result will have an empty column family, as follows:
+ * 
+ * row: shardID, colfam: (empty), colqual: docID
+ * 
+ * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to 
parallelize the search over all shardIDs.
+ * 
+ * This iterator will *ignore* any columnFamilies passed to {@link 
#seek(Range, Collection, boolean)} as it performs intersections over terms. 
Extending classes
+ * should override the {@link TermSource#seekColfams} in their 
implementation's {@link #init(SortedKeyValueIterator, Map, 
IteratorEnvironment)} method.
+ * 
+ * README.shard in docs/examples shows an example of using the 
IntersectingIterator.
+ */
+public class DocumentIndexIntersectingIterator implements 
SortedKeyValueIterator<Key,Value> {
+  
+    
+    
+    
+  protected Text nullText = new Text();
+  
+  protected Text getRow(Key key) {
+    return key.getRow();
+  }
+  
+  protected Text getTerm(Key key) {
+    return key.getColumnFamily();
+  }
+  
+  protected Text getTermCond(Key key) {
+    return key.getColumnQualifier();
+  }
+  
+  protected Key buildKey(Text row, TextColumn column) {
+      return new Key(row, (column.getColumnFamily() == null) ? nullText: 
column.getColumnFamily(), column.getColumnQualifier());
+  }
+  
+  protected Key buildKey(Text row, Text term) {
+    return new Key(row, (term == null) ? nullText : term);
+  }
+  
+  protected Key buildKey(Text row, Text term, Text termCond) {
+    return new Key(row, (term == null) ? nullText : term, termCond);
+  }
+  
+  protected Key buildFollowRowKey(Key key, Text term, Text termCond) {
+    return new Key(getRow(key.followingKey(PartialKey.ROW)),(term == null) ? 
nullText : term, termCond);
+  }
+  
+    protected static final Logger log = 
Logger.getLogger(DocumentIndexIntersectingIterator.class);
+
+    public static class TermSource {
+        public SortedKeyValueIterator<Key, Value> iter;
+        public Text term;
+        public Text termCond;
+        public Collection<ByteSequence> seekColfams;
+        public TextColumn column;
+        public boolean isPrefix;
+        public Key top ;
+        public Key next ;
+        public Text currentCQ;
+        private boolean seeked = false;
+
+        public TermSource(TermSource other) {
+         
+            this.iter = other.iter;
+            this.term = other.term;
+            this.termCond = other.termCond;
+            this.seekColfams = other.seekColfams;
+            this.column = other.column;
+            this.top = other.top;
+            this.next = other.next;
+            this.currentCQ = other.currentCQ;
+            this.isPrefix = other.isPrefix;
+        }
+
+
+        public TermSource(SortedKeyValueIterator<Key, Value> iter, TextColumn 
column) {
+           
+            this.iter = iter;
+            this.column = column;
+            this.term = column.getColumnFamily();
+            this.termCond = column.getColumnQualifier();
+            this.currentCQ = new Text(emptyByteArray);
+            this.seekColfams = Collections.<ByteSequence> singletonList(new 
ArrayByteSequence(term
+                    .getBytes(), 0, term.getLength()));
+           
+        }
+        
+        
+        
+        public void seek(Range r) throws IOException {
+
+            if (seeked) {
+ 
+                if (next != null && !r.beforeStartKey(next)) {
+                    if (next.getColumnFamily().equals(term)) {
+                        this.updateTop();
+                    }
+                } else if (iter.hasTop()) {
+                    iter.seek(r, seekColfams, true);
+                    this.updateTopNext();
+                } else {
+                    top = null;
+                    next = null;
+                
+                }
+            } else {
+
+                iter.seek(r, seekColfams, true);
+                this.updateTopNext();
+                seeked = true;
+            }
+
+        }
+        
+        
+        public void next() throws IOException {
+
+            this.updateTop();
+        }
+        
+        public void updateTop() throws IOException {
+
+            top = next;
+            if (next != null) {
+                iter.next();
+                if (iter.hasTop()) {
+                    next = iter.getTopKey();
+                } else {
+                    next = null;
+                }
+            }
+
+        }
+        
+        public void updateTopNext() throws IOException {
+
+            if (iter.hasTop()) {
+                top = iter.getTopKey();
+            } else {
+                top = null;
+                next = null;
+                return;
+            }
+            
+            iter.next();
+            
+            if(iter.hasTop()) {
+                next = iter.getTopKey();
+            } else {
+                next = null;
+            }
+        }
+        
+        public boolean hasTop() {
+            return top != null;
+        }
+        
+
+        public String getTermString() {
+            return (this.term == null) ? new String("Iterator") : 
this.term.toString();
+        }
+    }
+  
+  TermSource[] sources;
+  int sourcesCount = 0;
+  Range overallRange;
+  
+  // query-time settings
+  protected Text currentRow = null;
+  protected Text currentTermCond = new Text(emptyByteArray);
+  static final byte[] emptyByteArray = new byte[0];
+  
+  protected Key topKey = null;
+  protected Value value = new Value(emptyByteArray);
+  protected String ctxt = null;
+  protected boolean hasContext = false;
+  protected boolean termCondSet = false;
+  
+  public DocumentIndexIntersectingIterator() {}
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      //log.info("Calling deep copy on " + this);
+    return new DocumentIndexIntersectingIterator(this, env);
+  }
+  
+  private DocumentIndexIntersectingIterator(DocumentIndexIntersectingIterator 
other, IteratorEnvironment env) {
+    if (other.sources != null) {
+      sourcesCount = other.sourcesCount;
+      sources = new TermSource[sourcesCount];
+      for (int i = 0; i < sourcesCount; i++) {
+        sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), 
other.sources[i].column);
+      }
+    }
+  }
+  
+  @Override
+  public Key getTopKey() {
+
+    return topKey;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    // we don't really care about values
+    return value;
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return currentRow != null;
+  }
+
+    // precondition: currentRow is not null
+    private boolean seekOneSource(int sourceID) throws IOException {
+        // find the next key in the appropriate column family that is at or
+        // beyond the cursor (currentRow, currentCQ)
+        // advance the cursor if this source goes beyond it
+        // return whether we advanced the cursor
+
+        // within this loop progress must be made in one of the following 
forms:
+        // - currentRow or currentCQ must be increased
+        // - the given source must advance its iterator
+        // this loop will end when any of the following criteria are met
+        // - the iterator for the given source is pointing to the key
+        // (currentRow, columnFamilies[sourceID], currentCQ)
+        // - the given source is out of data and currentRow is set to null
+        // - the given source has advanced beyond the endRow and currentRow is
+        // set to null
+        boolean advancedCursor = false;
+        
+      
+        
+        
+
+        while (true) {
+            
+//            if(currentRow.toString().equals(s)) {
+//                log.info("Source id is " + sourceID);
+//                if (sources[sourceID].top != null) {
+//                    log.info("Top row is " + getRow(sources[sourceID].top));
+//                    log.info("Top cq is " + 
getTermCond(sources[sourceID].top));
+//                }
+//                if (sources[sourceID].next != null) {
+//                    log.info("Next row is " + 
getRow(sources[sourceID].next));
+//                    log.info("Next termCond is " + 
getTermCond(sources[sourceID].next));
+//                }
+//            }
+            
+            if (sources[sourceID].hasTop() == false) {
+                currentRow = null;
+                // setting currentRow to null counts as advancing the cursor
+                return true;
+            }
+            // check if we're past the end key
+            int endCompare = -1;
+            // we should compare the row to the end of the range
+
+            if (overallRange.getEndKey() != null) {
+                endCompare = 
overallRange.getEndKey().getRow().compareTo(sources[sourceID].top.getRow());
+                if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || 
endCompare < 0) {
+                    currentRow = null;
+                    // setting currentRow to null counts as advancing the 
cursor
+                    return true;
+                }
+            }
+            
+
+            
+            int rowCompare = 
currentRow.compareTo(getRow(sources[sourceID].top));
+            // check if this source is already at or beyond currentRow
+            // if not, then seek to at least the current row
+            
+        
+            
+            if (rowCompare > 0) {
+                // seek to at least the currentRow
+                Key seekKey = buildKey(currentRow, sources[sourceID].term);
+                sources[sourceID].seek(new Range(seekKey, true, null, false));
+              
+                continue;
+            }
+            // check if this source has gone beyond currentRow
+            // if so, advance currentRow
+            if (rowCompare < 0) {
+                currentRow.set(getRow(sources[sourceID].top));
+                //log.info("Current row is " + currentRow);
+                advancedCursor = true;
+                continue;
+            }
+            // we have verified that the current source is positioned in
+            // currentRow
+            // now we must make sure we're in the right columnFamily in the
+            // current row
+            // Note: Iterators are auto-magically set to the correct
+            // columnFamily
+
+            if (sources[sourceID].column.isValid()) {
+                
+                boolean isPrefix = false;
+                boolean contextEqual = false;
+                String tempContext = "";
+                
+                int termCompare;
+
+                String[] cQ = 
getTermCond(sources[sourceID].top).toString().split("\u0000");
+                tempContext = cQ[0];
+
+                if (!hasContext && ctxt == null) {
+                    ctxt = cQ[0];
+                }
+
+                contextEqual = ctxt.equals(cQ[0]);
+
+                String s1 = sources[sourceID].termCond.toString();
+                String s2 = cQ[1] + "\u0000" + cQ[2];
+
+                if (sources[sourceID].isPrefix) {
+                    isPrefix = s2.startsWith(s1 + "\u0000");
+                } else {
+                    isPrefix = s2.startsWith(s1);
+                }
+
+                termCompare = (contextEqual && isPrefix) ? 0 : (ctxt + 
"\u0000" + s1).compareTo(cQ[0] + "\u0000" + s2);
+
+                // if(currentRow.toString().equals(s)) {
+                // log.info("Term compare is " + termCompare);
+                // }
+           
+                // check if this source is already on the right columnFamily
+                // if not, then seek forwards to the right columnFamily
+                if (termCompare > 0) {
+                    Key seekKey = buildKey(currentRow, sources[sourceID].term, 
new Text(ctxt + 
+                            "\u0000" + sources[sourceID].termCond.toString()));
+                    sources[sourceID].seek(new Range(seekKey, true, null, 
false));
+                 
+                    continue;
+                }
+                // check if this source is beyond the right columnFamily
+                // if so, then seek to the next row
+                if (termCompare < 0) {
+                    // we're out of entries in the current row, so seek to the
+                    // next one
+                    
+                    if (endCompare == 0) {
+                        // we're done
+                        currentRow = null;
+                        // setting currentRow to null counts as advancing the
+                        // cursor
+                        return true;
+                    }
+                    
+                    
+                    
+                    //advance to next row if context set - all entries in 
given row exhausted
+                    if (hasContext || tempContext.length() == 0) {
+                        Key seekKey = buildFollowRowKey(sources[sourceID].top, 
sources[sourceID].term,
+                                new Text(ctxt + "\u0000" + 
sources[sourceID].termCond.toString()));
+                        sources[sourceID].seek(new Range(seekKey, true, null, 
false));
+                    } else {
+                        
+                        if(contextEqual && !isPrefix) {
+                            Key seekKey = buildKey(currentRow, 
sources[sourceID].term, new Text(ctxt + "\u0001"));
+                            sources[sourceID].seek(new Range(seekKey, true, 
null, false));
+                            if(sources[sourceID].top != null) {
+                                ctxt = 
getTermCond(sources[sourceID].top).toString().split("\u0000")[0];
+                            } 
+                        } else {
+                            Key seekKey = buildKey(currentRow, 
sources[sourceID].term, new Text(tempContext + 
+                                    "\u0000" + 
sources[sourceID].termCond.toString()));
+                            sources[sourceID].seek(new Range(seekKey, true, 
null, false));
+                            if(sources[sourceID].top != null) {
+                                ctxt = 
getTermCond(sources[sourceID].top).toString().split("\u0000")[0];
+                            } 
+                        }
+                        
+                    }
+                    
+                    
+//                    if(currentRow.toString().equals(s)) {
+//                        log.info("current term cond is " + currentTermCond);
+//                        
+//                    }
+                    
+             
+                    continue;
+                }
+            }
+         
+            
+            
+         
+            
+            
+            
+            
+            
+            
+            //set currentTermCond -- gets appended to end of currentKey column 
qualifier
+            //used to determine which term iterator to advance when a new 
iterator is created
+            
+            
sources[sourceID].currentCQ.set(getTermCond(sources[sourceID].top));
+            
+            if (sources[sourceID].next != null) {
+                        
+                //is hasContext, only consider sourceID with next having 
designated context
+                //otherwise don't set currentTermCond
+                if (!termCondSet && hasContext) {
+                    if (sources[sourceID].next.getRow().equals(currentRow)
+                            && 
sources[sourceID].next.getColumnQualifier().toString()
+                                    .startsWith(ctxt + "\u0000" + 
sources[sourceID].termCond.toString())) {
+                        currentTermCond.set(new 
Text(Integer.toString(sourceID)));
+                        termCondSet = true;
+                    }
+                } else if(!termCondSet){
+                    String[] cq = 
getTermCond(sources[sourceID].next).toString().split("\u0000");
+                    
+                    //set currentTermCond with preference given to sourceID 
having next with same context
+                    //otherwise set currentTermCond sourceID with next having 
termCond as prefix
+                    if (sources[sourceID].next.getRow().equals(currentRow)) {
+                        if 
(sources[sourceID].next.getColumnQualifier().toString()
+                                .startsWith(ctxt + "\u0000" + 
sources[sourceID].termCond.toString())) {
+                            currentTermCond.set(new 
Text(Integer.toString(sourceID)));
+                            termCondSet = true;
+                        } else if ((cq[1] + "\u0000" + 
cq[2]).startsWith(sources[sourceID].termCond.toString())) {
+                            currentTermCond.set(new 
Text(Integer.toString(sourceID)));
+                        }
+                    }
+                }
+            } 
+       
+           
+            break;
+        }
+
+        return advancedCursor;
+    }
+  
+  @Override
+  public void next() throws IOException {
+    if (currentRow == null) {
+      return;
+    }
+   
+    
+    
+    if(currentTermCond.getLength() != 0) {
+        
+        int id = Integer.parseInt(currentTermCond.toString());
+
+        sources[id].next();
+        currentTermCond.set(emptyByteArray);
+        termCondSet = false;
+        if(sources[id].top != null && !hasContext) {
+            ctxt = getTermCond(sources[id].top).toString().split("\u0000")[0];
+        }
+        advanceToIntersection();
+        return;
+    }
+    
+    sources[0].next();
+    if(sources[0].top != null && !hasContext) {
+        ctxt = getTermCond(sources[0].top).toString().split("\u0000")[0];
+    }
+    advanceToIntersection();
+  }
+  
+  protected void advanceToIntersection() throws IOException {
+    boolean cursorChanged = true;
+    while (cursorChanged) {
+      // seek all of the sources to at least the highest seen column qualifier 
in the current row
+      cursorChanged = false;
+      for (int i = 0; i < sourcesCount; i++) {
+//          log.info("New sourceID is " + i);
+        if (currentRow == null) {
+          topKey = null;
+          return;
+        }
+        if (seekOneSource(i)) {
+          currentTermCond.set(emptyByteArray); 
+          termCondSet = false;
+          cursorChanged = true;
+          break;
+        }
+      }
+    }
+    String cq = "";
+    for(int i = 0; i < sourcesCount; i++) {
+        cq = cq + sources[i].currentCQ.toString() + 
DocIndexIteratorUtil.DOC_ID_INDEX_DELIM;
+    }
+    
+        if (currentTermCond.getLength() == 0) {
+            topKey = buildKey(currentRow, nullText, new Text(cq + -1));
+        } else {
+            topKey = buildKey(currentRow, nullText, new Text(cq + 
currentTermCond.toString()));
+        }
+  }
+  
+  public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
+    if (iter.hasTop())
+      return iter.getTopKey().toString();
+    return "";
+  }
+  
+  private static final String columnOptionName = "columns";
+  private static final String columnPrefix = "prefixes";
+  private static final String context = "context";
+  
+  
+  
+  protected static String encodeColumns(TextColumn[] columns) {
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < columns.length; i++) {
+        sb.append(new 
String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnFamily()))));
+        sb.append('\n');
+        sb.append(new 
String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnQualifier()))));
+        sb.append('\u0001');
+      }
+      return sb.toString();
+    }
+    
+  
+  
+  protected static TextColumn[] decodeColumns(String columns) {
+      String[] columnStrings = columns.split("\u0001");
+      TextColumn[] columnTexts = new TextColumn[columnStrings.length];
+      for (int i = 0; i < columnStrings.length; i++) {
+        String[] columnComponents = columnStrings[i].split("\n");
+        columnTexts[i] = new TextColumn(new 
Text(Base64.decodeBase64(columnComponents[0].getBytes())), 
+                new Text(Base64.decodeBase64(columnComponents[1].getBytes())));
+      }
+      return columnTexts;
+    }
+  
+ 
+  
+  
+  
+  /**
+   * @param context
+   * @return encoded context
+   */
+  protected static String encodeContext(String context) {
+ 
+    return new String(Base64.encodeBase64(context.getBytes()));
+  }
+  
+ 
+  
+  /**
+   * @param context
+   * @return decoded context
+   */
+    protected static String decodeContext(String context) {
+
+        if (context == null) {
+            return null;
+        } else {
+            return new String(Base64.decodeBase64(context.getBytes()));
+        }
+    }
+  
+  
+  
+  
+  
+  protected static String encodeBooleans(boolean[] prefixes) {
+      byte[] bytes = new byte[prefixes.length];
+      for (int i = 0; i < prefixes.length; i++) {
+        if (prefixes[i])
+          bytes[i] = 1;
+        else
+          bytes[i] = 0;
+      }
+      return new String(Base64.encodeBase64(bytes));
+    }
+    
+    /**
+     * @param flags
+     * @return decoded flags
+     */
+    protected static boolean[] decodeBooleans(String prefixes) {
+      // return null of there were no flags
+      if (prefixes == null)
+        return null;
+      
+      byte[] bytes = Base64.decodeBase64(prefixes.getBytes());
+      boolean[] bFlags = new boolean[bytes.length];
+      for (int i = 0; i < bytes.length; i++) {
+        if (bytes[i] == 1)
+          bFlags[i] = true;
+        else
+          bFlags[i] = false;
+      }
+      return bFlags;
+    }
+  
+  
+  
+  
+  
+  
+  
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+    TextColumn[] terms = decodeColumns(options.get(columnOptionName));
+    boolean[] prefixes = decodeBooleans(options.get(columnPrefix));
+    ctxt = decodeContext(options.get(context));
+    
+    if(ctxt != null) {
+        hasContext = true;
+    }
+  
+   
+    
+    if (terms.length < 2) {
+      throw new IllegalArgumentException("IntersectionIterator requires two or 
more columns families");
+    }
+    
+    sources = new TermSource[terms.length];
+    sources[0] = new TermSource(source, terms[0]);
+    for (int i = 1; i < terms.length; i++) {
+        //log.info("For decoded column " + i + " column family is " + 
terms[i].getColumnFamily() + " and qualifier is " + 
terms[i].getColumnQualifier());
+      sources[i] = new TermSource(source.deepCopy(env), terms[i]);
+      sources[i].isPrefix = prefixes[i];
+    }
+    sourcesCount = terms.length;
+  }
+  
+    @Override
+    public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, 
boolean inclusive) throws IOException {
+        overallRange = new Range(range);
+        currentRow = new Text();
+        currentTermCond.set(emptyByteArray);
+        termCondSet = false;
+
+
+        
+//       log.info("Calling seek with range " + range);
+
+        // seek each of the sources to the right column family within the row
+        // given by key
+       
+        Key sourceKey;
+
+        if (rangeCqValid(range)) {
+            
+            String[] cqInfo = 
cqParser(range.getStartKey().getColumnQualifier());
+            int id = Integer.parseInt(cqInfo[1]);
+            
+
+            
+            if (id >= 0) {
+                for (int i = 0; i < sourcesCount; i++) {
+
+                    if (i == id) {
+                        sourceKey = buildKey(getRow(range.getStartKey()), 
sources[i].term, new Text(cqInfo[0]));
+                        sources[i].seek(new Range(sourceKey, true, null, 
false));
+                        sources[i].next();
+                        if(!hasContext && sources[i].hasTop()) {
+                            ctxt = 
getTermCond(sources[i].top).toString().split("\u0000")[0];
+                        }
+                    } else {
+                        sourceKey = buildKey(getRow(range.getStartKey()), 
sources[i].term);
+                        sources[i].seek(new Range(sourceKey, true, null, 
false));
+                    }
+                }
+            } else {
+                
+
+                for (int i = 0; i < sourcesCount; i++) {
+                    sourceKey = buildKey(getRow(range.getStartKey()), 
sources[i].term, range.getStartKey()
+                            .getColumnQualifier());
+                    sources[i].seek(new Range(sourceKey, true, null, false));
+                }
+            }
+                
+            
+        } else {
+            
+//            log.info("Range is invalid.");
+            for (int i = 0; i < sourcesCount; i++) {
+
+                if (range.getStartKey() != null) {
+
+                    sourceKey = buildKey(getRow(range.getStartKey()), 
sources[i].term);
+
+                    // Seek only to the term for this source as a column family
+                    sources[i].seek(new Range(sourceKey, true, null, false));
+                } else {
+                    // Seek only to the term for this source as a column family
+
+                    sources[i].seek(range);
+                }
+            }
+        }
+        
+        advanceToIntersection();
+
+    }
+    
+    
+    private String[] cqParser(Text cq) {
+        
+        String cQ = cq.toString();
+        String[] cqComponents = 
cQ.split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM);
+        int id = -1;
+        String[] valPos = new String[2];
+        
+
+        
+        
+        if(cqComponents.length > 1) {
+            id = Integer.parseInt(cqComponents[cqComponents.length-1]);
+            if (id >= 0) {
+                valPos[0] = cqComponents[id].toString();
+                valPos[1] = "" + id;
+            } else {
+                valPos[0] = cqComponents[0].toString();
+                valPos[1] = "" + id;
+            }
+        } else {
+            valPos[0] = cq.toString();
+            valPos[1] = "" + -1;
+        }
+        
+        return valPos;
+       
+    }
+    
+  
+  private boolean rangeCqValid(Range range) {
+      return (range.getStartKey() != null) && 
(range.getStartKey().getColumnQualifier() != null);
+  }
+  
+  
+  
+  public void addSource(SortedKeyValueIterator<Key,Value> source, 
IteratorEnvironment env, TextColumn column) {
+    // Check if we have space for the added Source
+    if (sources == null) {
+      sources = new TermSource[1];
+    } else {
+      // allocate space for node, and copy current tree.
+      // TODO: Should we change this to an ArrayList so that we can just add() 
? - ACCUMULO-1309
+      TermSource[] localSources = new TermSource[sources.length + 1];
+      int currSource = 0;
+      for (TermSource myTerm : sources) {
+        // TODO: Do I need to call new here? or can I just re-use the term? - 
ACCUMULO-1309
+        localSources[currSource] = new TermSource(myTerm);
+        currSource++;
+      }
+      sources = localSources;
+    }
+    sources[sourcesCount] = new TermSource(source.deepCopy(env), column);
+    sourcesCount++;
+  }
+  
+  /**
+   * Encode the columns to be used when iterating.
+   * 
+   * @param cfg
+   * @param columns
+   */
+  public static void setColumnFamilies(IteratorSetting cfg, TextColumn[] 
columns) {
+    if (columns.length < 2)
+      throw new IllegalArgumentException("Must supply at least two terms to 
intersect");
+    
+    boolean[] prefix = new boolean[columns.length];
+    
+    for(int i = 0; i < columns.length; i++) {
+        prefix[i] = columns[i].isPrefix();
+    }
+    
+    
+    
+    cfg.addOption(DocumentIndexIntersectingIterator.columnPrefix, 
DocumentIndexIntersectingIterator.encodeBooleans(prefix));
+    cfg.addOption(DocumentIndexIntersectingIterator.columnOptionName, 
DocumentIndexIntersectingIterator.encodeColumns(columns));
+  }
+  
+  
+  
+  
+  
+  public static void setContext(IteratorSetting cfg, String context) {
+     
+      cfg.addOption(DocumentIndexIntersectingIterator.context, 
DocumentIndexIntersectingIterator.encodeContext(context));
+      
+    }
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/TextColumn.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/TextColumn.java
 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/TextColumn.java
new file mode 100644
index 0000000..661f62b
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/documentIndex/TextColumn.java
@@ -0,0 +1,108 @@
+package mvm.rya.accumulo.documentIndex;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+import org.apache.hadoop.io.Text;
+
+public class TextColumn  {
+    
+    
+    private Text columnFamily;
+    private Text columnQualifier;
+    private boolean isPrefix = false;
+    
+    
+    
+    public TextColumn(Text columnFamily, Text columnQualifier) {
+        this.columnFamily = columnFamily;
+        this.columnQualifier = columnQualifier;
+    }
+    
+    
+    public TextColumn(TextColumn other) {
+        
+        this.columnFamily = new Text(other.columnFamily);
+        this.columnQualifier = new Text(other.columnQualifier);
+        this.isPrefix = other.isPrefix;
+      
+    }
+    
+    
+    public Text getColumnFamily() {
+        return columnFamily;
+    }
+    
+    
+    public boolean isPrefix() {
+        return isPrefix;
+    }
+    
+    
+    public void setIsPrefix(boolean isPrefix) {
+        this.isPrefix = isPrefix;
+    }
+    
+    
+    public boolean isValid() {
+        return (columnFamily != null && columnQualifier != null);
+    }
+    
+    
+    
+    public Text getColumnQualifier() {
+        return columnQualifier;
+    }
+    
+    
+    public void setColumnFamily(Text cf) {
+        this.columnFamily = cf;
+    }
+    
+    public void setColumnQualifier(Text cq) {
+        this.columnQualifier = cq;
+    }
+    
+    public String toString() {
+        
+        return columnFamily.toString() + ",  " + columnQualifier.toString() + 
",    prefix:" + isPrefix;
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        
+        if(other == null) {
+            return false;
+        }
+        
+        if(!(other instanceof TextColumn)) {
+            return false;
+        }
+        
+        TextColumn tc = (TextColumn) other;
+        
+        return this.columnFamily.equals(tc.columnFamily) && 
this.columnQualifier.equals(tc.columnQualifier) && this.isPrefix == tc.isPrefix;
+        
+        
+        
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java
 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java
new file mode 100644
index 0000000..0966903
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java
@@ -0,0 +1,324 @@
+package mvm.rya.accumulo.pcj.iterators;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+/**
+ * This {@link CloseableIteration} performs a hash join by joining each
+ * {@link Map.Entry<String, BindingSet>} with all corresponding
+ * {@link BindingSet} in a Multimap with the same String key.
+ *
+ */
+public class BindingSetHashJoinIterator implements
+               CloseableIteration<BindingSet, QueryEvaluationException> {
+
+       //BindingSets passed to PCJ mapped according to values
+       //associated with common variables with table
+       private Multimap<String, BindingSet> bindingJoinVarHash;
+       //BindingSets taken from PCJ table
+       private CloseableIteration<Map.Entry<String, BindingSet>, 
QueryEvaluationException> joinIter;
+       private Iterator<BindingSet> joinedBindingSets = Collections
+                       .emptyIterator();
+       //If PCJ contains LeftJoin, this is a set of variable in LeftJoin.  
Used when performing Join.
+       private Set<String> unAssuredVariables;
+       //indicates when HashJoin formed from a single collection of join 
variable or if the size and
+       //collection of join variables varies -- this is to optimize the join 
process
+       private HashJoinType type;
+       private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet();
+       private BindingSet next;
+       private boolean hasNextCalled = false;
+       private boolean isEmpty = false;
+
+       /**
+        * Enum type to indicate whether HashJoin will be performed over a fixed
+        * subset of variables common to each {@link BindingSet}, or if there 
is a
+        * collection of variable subsets over which to join.
+        *
+        */
+       public enum HashJoinType {
+               CONSTANT_JOIN_VAR, VARIABLE_JOIN_VAR
+       };
+
+       public BindingSetHashJoinIterator(
+                       Multimap<String, BindingSet> bindingJoinVarHash,
+                       CloseableIteration<Map.Entry<String, BindingSet>, 
QueryEvaluationException> joinIter,
+                       Set<String> unAssuredVariables, HashJoinType type) {
+               this.bindingJoinVarHash = bindingJoinVarHash;
+               this.joinIter = joinIter;
+               this.type = type;
+               this.unAssuredVariables = unAssuredVariables;
+       }
+
+       @Override
+       public boolean hasNext() throws QueryEvaluationException {
+               if (!hasNextCalled && !isEmpty) {
+                       while (joinedBindingSets.hasNext() || 
joinIter.hasNext()) {
+                               if (!joinedBindingSets.hasNext()) {
+                                       Entry<String, BindingSet> entry = 
joinIter.next();
+                                       joinedBindingSets = 
joinBindingSetEntry(entry);
+                               }
+                               if (!joinedBindingSets.hasNext()) {
+                                       continue;
+                               }
+                               next = joinedBindingSets.next();
+                               hasNextCalled = true;
+                               return true;
+                       }
+
+                       isEmpty = true;
+                       return false;
+               } else if (isEmpty) {
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+
+       @Override
+       public BindingSet next() throws QueryEvaluationException {
+               if (hasNextCalled) {
+                       hasNextCalled = false;
+               } else if (isEmpty) {
+                       throw new NoSuchElementException();
+               } else {
+                       if (this.hasNext()) {
+                               hasNextCalled = false;
+                       } else {
+                               throw new NoSuchElementException();
+                       }
+               }
+               return next;
+       }
+
+       @Override
+       public void remove() throws QueryEvaluationException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void close() throws QueryEvaluationException {
+               joinIter.close();
+       }
+
+       /**
+        * This method takes the valOrderString, which is a key used for 
computing
+        * hash joins, and generates multiple keys by pulling off one delimiter
+        * separated component at a time. This is used when the size of the 
join key
+        * varies from {@link Map.Entry} to Entry. It allows the BindingSet to 
be
+        * joined using all prefixes of the key.
+        *
+        * @param valOrderString
+        *            - key used for hash join
+        * @return
+        */
+       private List<String> getValueOrders(String valOrderString) {
+
+               List<String> valueOrders = new ArrayList<>();
+               String[] splitValOrderString = valOrderString
+                               .split(ExternalTupleSet.VALUE_DELIM);
+               StringBuffer buffer = new StringBuffer();
+               buffer.append(splitValOrderString[0]);
+               valueOrders.add(buffer.substring(0));
+
+               for (int i = 1; i < splitValOrderString.length; i++) {
+                       buffer.append(ExternalTupleSet.VALUE_DELIM + 
splitValOrderString[i]);
+                       valueOrders.add(buffer.substring(0));
+               }
+
+               return valueOrders;
+       }
+
+       /**
+        * This method verifies that all common variables have a common value 
and
+        * then joins the BindingSets together. In the case that the PCJ 
contains a
+        * LeftJoin, if the leftBs and rightBs have a common variable with 
distinct
+        * values and that common variable is unassured (only appears in 
LeftJoin),
+        * this method uses the value corresponding to leftBs.
+        *
+        * @param leftBs
+        *            - BindingSet passed into PCJ
+        * @param rightBs
+        *            - PCJ BindingSet
+        * @return - joined BindingSet
+        */
+       private BindingSet joinBindingSets(BindingSet leftBs, BindingSet 
rightBs) {
+
+               Set<String> commonVars = 
Sets.intersection(leftBs.getBindingNames(),
+                               rightBs.getBindingNames());
+               // compare values associated with common variables to make sure
+               // BindingSets can be joined. Possible for leftBs and rightBs
+               // to have a common unAssuredVariable in event PCJ contains 
LeftJoin.
+               // if values corresponding to common unAssuredVariable do not 
agree
+               // add value corresponding to leftBs
+               for (String s : commonVars) {
+                       if (!leftBs.getValue(s).equals(rightBs.getValue(s))
+                                       && !unAssuredVariables.contains(s)) {
+                               return EMPTY_BINDINGSET;
+                       }
+               }
+               QueryBindingSet bs = new 
QueryBindingSet(removeConstants(leftBs));
+
+               rightBs = removeConstants(rightBs);
+               // only add Bindings corresponding to variables that have no 
value
+               // assigned. This takes into account case where leftBs and 
rightBs
+               // share a common, unAssuredVariable. In this case, use value
+               // corresponding
+               // to leftBs, which is effectively performing a LeftJoin.
+               for (String s : rightBs.getBindingNames()) {
+                       if (bs.getValue(s) == null) {
+                               bs.addBinding(s, rightBs.getValue(s));
+                       }
+               }
+
+               return bs;
+       }
+
+       private BindingSet removeConstants(BindingSet bs) {
+               QueryBindingSet bSet = new QueryBindingSet();
+               for (String s : bs.getBindingNames()) {
+                       if (!s.startsWith(ExternalTupleSet.CONST_PREFIX)) {
+                               bSet.addBinding(bs.getBinding(s));
+                       }
+               }
+               return bSet;
+       }
+
+       /**
+        * This method returns an Iterator which joins the given Entry's 
BindingSet
+        * to all BindingSets which matching the Entry's key.
+        *
+        * @param entry - entry to be joined
+        * @return - Iterator over joined BindingSets
+        */
+       private Iterator<BindingSet> joinBindingSetEntry(
+                       Map.Entry<String, BindingSet> entry) {
+
+               List<Collection<BindingSet>> matches = new ArrayList<>();
+               if (type == HashJoinType.CONSTANT_JOIN_VAR) {
+                       if (bindingJoinVarHash.containsKey(entry.getKey())) {
+                               
matches.add(bindingJoinVarHash.get(entry.getKey()));
+                       }
+               } else {
+                       List<String> valOrders = getValueOrders(entry.getKey());
+                       for (String s : valOrders) {
+                               if (bindingJoinVarHash.containsKey(s)) {
+                                       matches.add(bindingJoinVarHash.get(s));
+                               }
+                       }
+               }
+
+               if (matches.size() == 0) {
+                       return Collections.emptyIterator();
+               } else {
+                       return new 
BindingSetCollectionsJoinIterator(entry.getValue(),
+                                       matches);
+               }
+
+       }
+
+       /**
+        * Given a BindingSet and a List of Collections of BindingSets, this
+        * Iterator joins the BindingSet with the BindingSets in each Collection
+        *
+        */
+       private class BindingSetCollectionsJoinIterator implements
+                       Iterator<BindingSet> {
+
+               private Iterator<Collection<BindingSet>> collectionIter;
+               private Iterator<BindingSet> bsIter = 
Collections.emptyIterator();
+               private BindingSet next;
+               private BindingSet joinBs;
+               private boolean hasNextCalled = false;
+               private boolean isEmpty = false;
+
+               public BindingSetCollectionsJoinIterator(BindingSet bs,
+                               List<Collection<BindingSet>> collection) {
+                       this.collectionIter = collection.iterator();
+                       this.joinBs = bs;
+               }
+
+               @Override
+               public boolean hasNext() {
+
+                       if (!hasNextCalled && !isEmpty) {
+                               while (bsIter.hasNext() || 
collectionIter.hasNext()) {
+                                       if (!bsIter.hasNext()) {
+                                               bsIter = 
collectionIter.next().iterator();
+                                       }
+                                       next = joinBindingSets(bsIter.next(), 
joinBs);
+                                       if (next == EMPTY_BINDINGSET) {
+                                               continue;
+                                       }
+                                       hasNextCalled = true;
+                                       return true;
+                               }
+                               isEmpty = true;
+                               return false;
+                       } else if (isEmpty) {
+                               return false;
+                       } else {
+                               return true;
+                       }
+               }
+
+               @Override
+               public BindingSet next() {
+                       if (hasNextCalled) {
+                               hasNextCalled = false;
+                       } else if (isEmpty) {
+                               throw new NoSuchElementException();
+                       } else {
+                               if (this.hasNext()) {
+                                       hasNextCalled = false;
+                               } else {
+                                       throw new NoSuchElementException();
+                               }
+                       }
+                       return next;
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException();
+               }
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/IteratorCombiner.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/IteratorCombiner.java
 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/IteratorCombiner.java
new file mode 100644
index 0000000..2407865
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/IteratorCombiner.java
@@ -0,0 +1,107 @@
+package mvm.rya.accumulo.pcj.iterators;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ *     This {@link CloseableIteration} takes in a list of CloseableIterations
+ *     and merges them together into a single CloseableIteration.
+ *
+ */
+public class IteratorCombiner implements
+               CloseableIteration<BindingSet, QueryEvaluationException> {
+
+
+       private Collection<CloseableIteration<BindingSet, 
QueryEvaluationException>> iterators;
+       private Iterator<CloseableIteration<BindingSet, 
QueryEvaluationException>> iteratorIterator;
+       private CloseableIteration<BindingSet, QueryEvaluationException> 
currIter;
+       private boolean isEmpty = false;
+       private boolean hasNextCalled = false;
+       private BindingSet next;
+
+       public IteratorCombiner(Collection<CloseableIteration<BindingSet, 
QueryEvaluationException>> iterators) {
+               Preconditions.checkArgument(iterators.size() > 0);
+               this.iterators = iterators;
+               iteratorIterator = iterators.iterator();
+               currIter = iteratorIterator.next();
+       }
+
+       @Override
+       public boolean hasNext() throws QueryEvaluationException {
+               if (!hasNextCalled && !isEmpty) {
+                       while (currIter.hasNext() || 
iteratorIterator.hasNext()) {
+                               if(!currIter.hasNext()) {
+                                       currIter = iteratorIterator.next();
+                               }
+                               if(!currIter.hasNext()) {
+                                       continue;
+                               }
+                               next = currIter.next();
+                               hasNextCalled = true;
+                               return true;
+                       }
+                       isEmpty = true;
+                       return false;
+               } else if (isEmpty) {
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+
+       @Override
+       public BindingSet next() throws QueryEvaluationException {
+               if (hasNextCalled) {
+                       hasNextCalled = false;
+               } else if (isEmpty) {
+                       throw new NoSuchElementException();
+               } else {
+                       if (this.hasNext()) {
+                               hasNextCalled = false;
+                       } else {
+                               throw new NoSuchElementException();
+                       }
+               }
+               return next;
+       }
+
+       @Override
+       public void remove() throws QueryEvaluationException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void close() throws QueryEvaluationException {
+               for(CloseableIteration<BindingSet, QueryEvaluationException> 
iterator: iterators) {
+                       iterator.close();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java
 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java
new file mode 100644
index 0000000..0c7369c
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToCrossProductBindingSetIterator.java
@@ -0,0 +1,267 @@
+package mvm.rya.accumulo.pcj.iterators;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.model.Value;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.collect.HashBiMap;
+
+/**
+ *     This class takes in a {@link Scanner} and a Collection of BindingSets,
+ *     deserializes each {@link Map.Entry<Key,Value>} taken from the Scanner 
into
+ *  a {@link BindingSet}, and performs a cross product on the BindingSet with
+ *  each BindingSet in the provided Collection.  The user can also specify a
+ *  {@link Map<String, Value>} of constant constraints that can be used to 
filter.
+ *
+ */
+public class PCJKeyToCrossProductBindingSetIterator implements
+               CloseableIteration<BindingSet, QueryEvaluationException> {
+
+       //BindingSets passed to PCJ used to form cross product
+       private List<BindingSet> crossProductBs;
+       //Scanner over PCJ table
+       private Scanner scanner;
+       //Iterator over PCJ scanner
+       private Iterator<Map.Entry<Key, org.apache.accumulo.core.data.Value>> 
iterator;
+       //Map of PCJ variables in table to variable in query
+       private Map<String, String> pcjVarMap;
+       //if PCJ contains LeftJoin, this is a set of variables that only appear 
in
+       //LeftJoin.  Used when performing the cross product.
+       private Set<String> unAssuredVariables;
+       private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+       private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet();
+       private Iterator<BindingSet> crossProductIter = 
Collections.emptyIterator();
+       private Map<String, Value> constantConstraints;
+       private BindingSet next;
+       private boolean hasNextCalled = false;
+       private boolean isEmpty = false;
+       private boolean crossProductBsExist = false;
+       private boolean constantConstraintsExist = false;
+
+       public PCJKeyToCrossProductBindingSetIterator(Scanner scanner,
+                       List<BindingSet> crossProductBs,
+                       Map<String, Value> constantConstraints, Set<String> 
unAssuredVariables,
+                       Map<String, String> pcjVarMap) {
+               this.crossProductBs = crossProductBs;
+               this.scanner = scanner;
+               this.iterator = scanner.iterator();
+               this.pcjVarMap = HashBiMap.create(pcjVarMap).inverse();
+               this.constantConstraints = constantConstraints;
+               this.crossProductBsExist = crossProductBs.size() > 0;
+               this.constantConstraintsExist = constantConstraints.size() > 0;
+               this.unAssuredVariables = unAssuredVariables;
+       }
+
+       @Override
+       public boolean hasNext() throws QueryEvaluationException {
+               if (!hasNextCalled && !isEmpty) {
+                       if (crossProductBsExist) {
+                               while (crossProductIter.hasNext() || 
iterator.hasNext()) {
+                                       if (!crossProductIter.hasNext()) {
+                                               Key key = 
iterator.next().getKey();
+                                               try {
+                                                       crossProductIter = 
getCrossProducts(getBindingSet(key));
+                                               } catch 
(BindingSetConversionException e) {
+                                                       throw new 
QueryEvaluationException(e);
+                                               }
+                                       }
+                                       if (!crossProductIter.hasNext()) {
+                                               continue;
+                                       }
+                                       next = crossProductIter.next();
+                                       hasNextCalled = true;
+                                       return true;
+                               }
+                       } else {
+                               while (iterator.hasNext()) {
+                                       Key key = iterator.next().getKey();
+                                       try {
+                                               next = getBindingSet(key);
+                                       } catch (BindingSetConversionException 
e) {
+                                               throw new 
QueryEvaluationException(e);
+                                       }
+                                       //BindingSet cannot be deserialized or 
is filtered
+                                       //out by constant constraints
+                                       if (next == null || next == 
EMPTY_BINDINGSET) {
+                                               continue;
+                                       }
+                                       hasNextCalled = true;
+                                       return true;
+                               }
+                       }
+                       isEmpty = true;
+                       return false;
+               } else if (isEmpty) {
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+
+       @Override
+       public BindingSet next() throws QueryEvaluationException {
+               if (hasNextCalled) {
+                       hasNextCalled = false;
+               } else if (isEmpty) {
+                       throw new NoSuchElementException();
+               } else {
+                       if (this.hasNext()) {
+                               hasNextCalled = false;
+                       } else {
+                               throw new NoSuchElementException();
+                       }
+               }
+               return next;
+       }
+
+       @Override
+       public void remove() throws QueryEvaluationException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void close() throws QueryEvaluationException {
+               scanner.close();
+       }
+
+       /**
+        *
+        * @param key
+        *            - Accumulo key obtained from scan
+        * @return - BindingSet satisfying any specified constant constraints
+        * @throws BindingSetConversionException
+        * @throws QueryEvaluationException
+        */
+       private BindingSet getBindingSet(Key key)
+                       throws BindingSetConversionException, 
QueryEvaluationException {
+               byte[] row = key.getRow().getBytes();
+               String[] varOrder = key.getColumnFamily().toString()
+                               .split(ExternalTupleSet.VAR_ORDER_DELIM);
+
+               BindingSet bindingSet = converter.convert(row, new 
VariableOrder(
+                               varOrder));
+
+               QueryBindingSet bs = new QueryBindingSet();
+               for (String var : bindingSet.getBindingNames()) {
+                       String mappedVar = null;
+                       if(pcjVarMap.containsKey(var)) {
+                               mappedVar = pcjVarMap.get(var);
+                       } else {
+                               throw new QueryEvaluationException("PCJ 
Variable has no mapping to query variable.");
+                       }
+                       if (constantConstraintsExist) {
+                               if 
(mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX)
+                                               && 
constantConstraints.containsKey(mappedVar)
+                                               && 
!constantConstraints.get(mappedVar).equals(
+                                                               
bindingSet.getValue(var))) {
+                                       return EMPTY_BINDINGSET;
+                               }
+                       }
+
+                       if 
(!mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX)) {
+                                       bs.addBinding(mappedVar, 
bindingSet.getValue(var));
+                       }
+               }
+               return bs;
+       }
+
+       /**
+        * This method forms the cross-product between an input BindingSet and 
the
+        * BindingSets contained in crossProdcutBs.
+        *
+        * @param bs
+        *            - {@link BindingSet} used to form cross product with
+        *            cross-product BindingSets
+        * @return - Iterator over resulting cross-product
+        */
+       private Iterator<BindingSet> getCrossProducts(BindingSet bs) {
+               Set<BindingSet> crossProducts = new HashSet<BindingSet>();
+
+               for (BindingSet bSet : crossProductBs) {
+                       BindingSet prod = takeCrossProduct(bSet, bs);
+                       if (prod != EMPTY_BINDINGSET) {
+                               crossProducts.add(prod);
+                       }
+               }
+
+               return crossProducts.iterator();
+
+       }
+
+       /**
+        * This method compute the cross product of the BindingSet passed to 
the PCJ
+        * and the PCJ BindingSet.  It verifies that only common variables are 
unassured
+        * variables, and if leftBs and rightBs have distinct values for a 
given variable,
+        * this method uses the value from leftBs in the cross product 
BindingSet - this
+        * is effectively performing a LeftJoin.
+        *
+        * @param leftBs - BindingSet passed to PCJ
+        * @param rightBs - PCJ BindingSet
+        * @return - cross product BindingSet
+        */
+       private BindingSet takeCrossProduct(BindingSet leftBs, BindingSet 
rightBs) {
+               if (bindingSetsIntersect(leftBs, rightBs)) {
+                       return EMPTY_BINDINGSET;
+               }
+               QueryBindingSet bs = new QueryBindingSet(leftBs);
+
+               //only add Bindings corresponding to variables that have no 
value
+               //assigned.  This takes into account case where leftBs and 
rightBs
+               //share a common, unAssuredVariable.  In this case, use value 
corresponding
+               //to leftBs, which is effectively performing a LeftJoin.
+               for(String s: rightBs.getBindingNames()) {
+                       if(bs.getValue(s) == null) {
+                               bs.addBinding(s, rightBs.getValue(s));
+                       }
+               }
+               return bs;
+       }
+
+       private boolean bindingSetsIntersect(BindingSet bs1, BindingSet bs2) {
+
+               for(String s: bs1.getBindingNames()) {
+                       if(bs2.getValue(s) != null && 
!unAssuredVariables.contains(s)) {
+                               return true;
+                       }
+               }
+               return false;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java
 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java
new file mode 100644
index 0000000..1b821d4
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/accumulo/pcj/iterators/PCJKeyToJoinBindingSetIterator.java
@@ -0,0 +1,199 @@
+package mvm.rya.accumulo.pcj.iterators;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+
+import mvm.rya.api.RdfCloudTripleStoreUtils;
+import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.model.Value;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashBiMap;
+
+/**
+ * This class takes in a {@link Scanner} and a Collection of BindingSets,
+ * deserializes each {@link Map.Entry<Key,Value>} taken from the Scanner into a
+ * {@link BindingSet}, and creates a {@link Map.Entry<String, BindingSet>}
+ * object to perform as hash join. The user can also specify a {@link Map
+ * <String, Value>} of constant constraints that can be used to filter.
+ *
+ */
+public class PCJKeyToJoinBindingSetIterator
+               implements
+               CloseableIteration<Map.Entry<String, BindingSet>, 
QueryEvaluationException> {
+
+       //map of variables as they appear in PCJ table to query variables
+       private Map<String, String> pcjVarMap;
+       //constant constraints used for filtering
+       private Map<String, Value> constantConstraints;
+       //max number of variables an entry in the batch of BindingSets had in 
common with PCJ
+       //this is used for constructing hash join key.
+       private int maxPrefixLen;
+       private static final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
+       private final Map.Entry<String, BindingSet> EMPTY_ENTRY = new 
RdfCloudTripleStoreUtils.CustomEntry<String, BindingSet>(
+                       "", new QueryBindingSet());
+       private Iterator<Entry<Key, org.apache.accumulo.core.data.Value>> 
iterator;
+       private boolean hasNextCalled = false;
+       private boolean isEmpty = false;
+       private Map.Entry<String, BindingSet> next;
+       private BatchScanner scanner;
+
+       public PCJKeyToJoinBindingSetIterator(BatchScanner scanner,
+                       Map<String, String> pcjVarMap,
+                       Map<String, Value> constantConstraints, int 
maxPrefixLen) {
+               Preconditions.checkNotNull(scanner);
+               Preconditions.checkArgument(pcjVarMap.size() > 0,
+                               "Variable map must contain at least one 
variable!");
+               Preconditions.checkNotNull(constantConstraints,
+                               "Constant constraints cannot be null.");
+               Preconditions.checkArgument(maxPrefixLen > 0,
+                               "Max prefix length must be greater than 0.");
+               Preconditions
+                               .checkArgument(maxPrefixLen <= pcjVarMap.size(),
+                                               "Max prefix length must be less 
than total number of binding names.");
+               this.scanner = scanner;
+               this.pcjVarMap = HashBiMap.create(pcjVarMap).inverse();
+               this.constantConstraints = constantConstraints;
+               this.maxPrefixLen = maxPrefixLen;
+               this.iterator = scanner.iterator();
+
+       }
+
+       public PCJKeyToJoinBindingSetIterator(BatchScanner scanner,
+                       Map<String, String> pcjVarMap, int maxPrefixLen) {
+               this(scanner, pcjVarMap, new HashMap<String, Value>(), 
maxPrefixLen);
+       }
+
+       @Override
+       public boolean hasNext() throws QueryEvaluationException {
+
+               if (!hasNextCalled && !isEmpty) {
+                       while (iterator.hasNext()) {
+                               Key key = iterator.next().getKey();
+                               // get bindings from scan without values 
associated with
+                               // constant constraints
+                               try {
+                                       next = 
getBindingSetEntryAndMatchConstants(key);
+                               } catch (BindingSetConversionException e) {
+                                       throw new QueryEvaluationException(
+                                                       "Could not deserialize 
PCJ BindingSet.");
+                               }
+                               // skip key if constant constraint don't match
+                               if (next == EMPTY_ENTRY) {
+                                       continue;
+                               }
+                               hasNextCalled = true;
+                               return true;
+                       }
+                       isEmpty = true;
+                       return false;
+               } else if (isEmpty) {
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+
+       @Override
+       public Entry<String, BindingSet> next() throws QueryEvaluationException 
{
+               if (hasNextCalled) {
+                       hasNextCalled = false;
+               } else if (isEmpty) {
+                       throw new NoSuchElementException();
+               } else {
+                       if (this.hasNext()) {
+                               hasNextCalled = false;
+                       } else {
+                               throw new NoSuchElementException();
+                       }
+               }
+               return next;
+       }
+
+       @Override
+       public void remove() throws QueryEvaluationException {
+               throw new UnsupportedOperationException();
+       }
+
+       @Override
+       public void close() throws QueryEvaluationException {
+               scanner.close();
+       }
+
+       /**
+        *
+        * @param key
+        *            - Accumulo key obtained from scan
+        * @return - Entry<String,BindingSet> satisfying the constant 
constraints
+        * @throws BindingSetConversionException
+        */
+       private Map.Entry<String, BindingSet> 
getBindingSetEntryAndMatchConstants(
+                       Key key) throws BindingSetConversionException {
+               byte[] row = key.getRow().getBytes();
+               String[] varOrder = key.getColumnFamily().toString()
+                               .split(ExternalTupleSet.VAR_ORDER_DELIM);
+
+               BindingSet bindingSet = converter.convert(row, new 
VariableOrder(
+                               varOrder));
+
+               QueryBindingSet bs = new QueryBindingSet();
+               for (String var : bindingSet.getBindingNames()) {
+                       String mappedVar = pcjVarMap.get(var);
+                       if (mappedVar.startsWith(ExternalTupleSet.CONST_PREFIX)
+                                       && 
constantConstraints.containsKey(mappedVar)
+                                       && 
!constantConstraints.get(mappedVar).equals(
+                                                       
bindingSet.getValue(var))) {
+                               return EMPTY_ENTRY;
+                       } else {
+                               bs.addBinding(mappedVar, 
bindingSet.getValue(var));
+                       }
+               }
+
+               String orderedValueString = 
bindingSet.getValue(varOrder[0]).toString();
+               for (int i = 1; i < maxPrefixLen; i++) {
+                       Value value = bindingSet.getValue(varOrder[i]);
+                       if (value != null) {
+                               orderedValueString = orderedValueString
+                                               + ExternalTupleSet.VALUE_DELIM 
+ value.toString();
+                       }
+               }
+
+               return new RdfCloudTripleStoreUtils.CustomEntry<String, 
BindingSet>(
+                               orderedValueString, bs);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
 
b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
new file mode 100644
index 0000000..53f29f4
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import mvm.rya.api.client.BatchUpdatePCJ;
+import mvm.rya.api.client.InstanceDoesNotExistException;
+import mvm.rya.api.client.PCJDoesNotExistException;
+import mvm.rya.api.client.RyaClientException;
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import 
mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import mvm.rya.api.instance.RyaDetailsRepository;
+import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import mvm.rya.api.instance.RyaDetailsUpdater;
+import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import 
mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.rdftriplestore.inference.InferenceEngineException;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Uses an in memory Rya Client to batch update a PCJ index.
+ */
+public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements 
BatchUpdatePCJ {
+
+    private static final Logger log = 
Logger.getLogger(AccumuloBatchUpdatePCJ.class);
+
+    /**
+     * Constructs an instance of {@link AccumuloBatchUpdatePCJ}.
+     *
+     * @param connectionDetails - Details about the values that were used to 
create the connector to the cluster. (not null)
+     * @param connector - Provides programmatic access to the instance of 
Accumulo that hosts Rya instance. (not null)
+     */
+    public AccumuloBatchUpdatePCJ(final AccumuloConnectionDetails 
connectionDetails, final Connector connector) {
+        super(connectionDetails, connector);
+    }
+
+    @Override
+    public void batchUpdate(final String ryaInstanceName, final String pcjId) 
throws InstanceDoesNotExistException, PCJDoesNotExistException, 
RyaClientException {
+        requireNonNull(ryaInstanceName);
+        requireNonNull(pcjId);
+        verifyPCJState(ryaInstanceName, pcjId);
+        updatePCJResults(ryaInstanceName, pcjId);
+        updatePCJMetadata(ryaInstanceName, pcjId);
+    }
+
+    private void verifyPCJState(final String ryaInstanceName, final String 
pcjId) throws RyaClientException {
+        try {
+            // Fetch the Rya instance's details.
+            final RyaDetailsRepository detailsRepo = new 
AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName);
+            final RyaDetails ryaDetails = detailsRepo.getRyaInstanceDetails();
+
+            // Ensure PCJs are enabled.
+            if(!ryaDetails.getPCJIndexDetails().isEnabled()) {
+                throw new RyaClientException("PCJs are not enabled for the Rya 
instance named '" + ryaInstanceName + "'.");
+            }
+
+            // Ensure the PCJ exists.
+            
if(!ryaDetails.getPCJIndexDetails().getPCJDetails().containsKey(pcjId)) {
+                throw new PCJDoesNotExistException("The PCJ with id '" + pcjId 
+ "' does not exist within Rya instance '" + ryaInstanceName + "'.");
+            }
+
+            // Ensure the PCJ is not already being incrementally updated.
+            final PCJDetails pcjDetails = 
ryaDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+            final Optional<PCJUpdateStrategy> updateStrategy = 
pcjDetails.getUpdateStrategy();
+            if(updateStrategy.isPresent() && updateStrategy.get() == 
PCJUpdateStrategy.INCREMENTAL) {
+                throw new RyaClientException("The PCJ with id '" + pcjId + "' 
is already being updated incrementally.");
+            }
+        } catch(final NotInitializedException e) {
+            throw new InstanceDoesNotExistException("No RyaDetails are 
initialized for the Rya instance named '" + ryaInstanceName + "'.", e);
+        } catch (final RyaDetailsRepositoryException e) {
+            throw new RyaClientException("Could not fetch the RyaDetails for 
the Rya instance named '" + ryaInstanceName + "'.", e);
+        }
+    }
+
+    private void updatePCJResults(final String ryaInstanceName, final String 
pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, 
RyaClientException {
+        // Things that have to be closed before we exit.
+        Sail sail = null;
+        SailConnection sailConn = null;
+        CloseableIteration<? extends BindingSet, QueryEvaluationException> 
results = null;
+
+        try {
+            // Create an instance of Sail backed by the Rya instance.
+            sail = connectToRya(ryaInstanceName);
+
+            // Purge the old results from the PCJ.
+            final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(super.getConnector(), ryaInstanceName);
+            try {
+                pcjStorage.purge(pcjId);
+            } catch (final PCJStorageException e) {
+                throw new RyaClientException("Could not batch update PCJ with 
ID '" + pcjId + "' because the old " +
+                        "results could not be purged from it.", e);
+            }
+
+            try {
+                // Parse the PCJ's SPARQL query.
+                final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+                final String sparql = metadata.getSparql();
+                final SPARQLParser parser = new SPARQLParser();
+                final ParsedQuery parsedQuery = parser.parseQuery(sparql, 
null);
+
+                // Execute the query.
+                sailConn = sail.getConnection();
+                results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, 
null, false);
+
+                // Load the results into the PCJ table.
+                final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
+
+                while(results.hasNext()) {
+                    final VisibilityBindingSet result = new 
VisibilityBindingSet(results.next(), "");
+                    batch.add(result);
+
+                    if(batch.size() == 1000) {
+                        pcjStorage.addResults(pcjId, batch);
+                        batch.clear();
+                    }
+                }
+
+                if(!batch.isEmpty()) {
+                    pcjStorage.addResults(pcjId, batch);
+                    batch.clear();
+                }
+            } catch(final MalformedQueryException | PCJStorageException | 
SailException | QueryEvaluationException e) {
+                throw new RyaClientException("Fail to batch load new results 
into the PCJ with ID '" + pcjId + "'.", e);
+            }
+        } finally {
+            if(results != null) {
+                try {
+                    results.close();
+                } catch (final QueryEvaluationException e) {
+                    log.warn(e.getMessage(), e);
+                }
+            }
+
+            if(sailConn != null) {
+                try {
+                    sailConn.close();
+                } catch (final SailException e) {
+                    log.warn(e.getMessage(), e);
+                }
+            }
+
+            if(sail != null) {
+                try {
+                    sail.shutDown();
+                } catch (final SailException e) {
+                    log.warn(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    private Sail connectToRya(final String ryaInstanceName) throws 
RyaClientException {
+        try {
+            final AccumuloConnectionDetails connectionDetails = 
super.getAccumuloConnectionDetails();
+
+            final AccumuloRdfConfiguration ryaConf = new 
AccumuloRdfConfiguration();
+            ryaConf.setTablePrefix(ryaInstanceName);
+            ryaConf.set(ConfigUtils.CLOUDBASE_USER, 
connectionDetails.getUsername());
+            ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, new 
String(connectionDetails.getPassword()));
+            ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, 
connectionDetails.getZookeepers());
+            ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, 
connectionDetails.getInstanceName());
+
+            // Turn PCJs off so that we will only scan the core Rya tables 
while building the PCJ results.
+            ryaConf.set(ConfigUtils.USE_PCJ, "false");
+
+            return RyaSailFactory.getInstance(ryaConf);
+        } catch (SailException | AccumuloException | AccumuloSecurityException 
| RyaDAOException | InferenceEngineException e) {
+            throw new RyaClientException("Could not connect to the Rya 
instance named '" + ryaInstanceName + "'.", e);
+        }
+    }
+
+    private void updatePCJMetadata(final String ryaInstanceName, final String 
pcjId) throws RyaClientException {
+        // Update the PCJ's metadata to indicate it was just batch updated.
+        try {
+            final RyaDetailsRepository detailsRepo = new 
AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName);
+
+            new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+                @Override
+                public RyaDetails mutate(final RyaDetails originalDetails) 
throws CouldNotApplyMutationException {
+                    // Update the original PCJ Details to indicate they were 
batch updated.
+                    final PCJDetails originalPCJDetails = 
originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+                    final PCJDetails.Builder mutatedPCJDetails = 
PCJDetails.builder( originalPCJDetails )
+                            .setUpdateStrategy( PCJUpdateStrategy.BATCH )
+                            .setLastUpdateTime( new Date());
+
+                    // Replace the old PCJ Details with the updated ones.
+                    final RyaDetails.Builder builder = 
RyaDetails.builder(originalDetails);
+                    builder.getPCJIndexDetails().addPCJDetails( 
mutatedPCJDetails );
+                    return builder.build();
+                }
+            });
+        } catch (final RyaDetailsRepositoryException | 
CouldNotApplyMutationException e) {
+            throw new RyaClientException("Could not update the PCJ's 
metadata.", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCommand.java
 
b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCommand.java
new file mode 100644
index 0000000..078e985
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCommand.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.accumulo.core.client.Connector;
+
+/**
+ * An abstract class that holds onto Accumulo access information. Extend this
+ * when implementing a command that interacts with Accumulo.
+ */
+@ParametersAreNonnullByDefault
+public abstract class AccumuloCommand {
+
+    private final AccumuloConnectionDetails connectionDetails;
+    private final Connector connector;
+
+    /**
+     * Constructs an instance of {@link AccumuloCommand}.
+     *
+     * Details about the values that were used to create the connector to the 
cluster. (not null)
+     * @param connector - Provides programatic access to the instance of 
Accumulo
+     *   that hosts Rya instance. (not null)
+     */
+    public AccumuloCommand(
+            final AccumuloConnectionDetails connectionDetails,
+            final Connector connector) {
+        this.connectionDetails = requireNonNull( connectionDetails );
+        this.connector = requireNonNull(connector);
+    }
+
+    /**
+     * @return Details about the values that were used to create the connector 
to the cluster. (not null)
+     */
+    public AccumuloConnectionDetails getAccumuloConnectionDetails() {
+        return connectionDetails;
+    }
+
+    /**
+     * @return Provides programatic access to the instance of Accumulo that 
hosts Rya instance.
+     */
+    public Connector getConnector() {
+        return connector;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloConnectionDetails.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloConnectionDetails.java
 
b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloConnectionDetails.java
new file mode 100644
index 0000000..c0a7be7
--- /dev/null
+++ 
b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloConnectionDetails.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.annotation.concurrent.Immutable;
+
+/**
+ * The information that the shell used to connect to Accumulo.
+ */
+@Immutable
+@ParametersAreNonnullByDefault
+public class AccumuloConnectionDetails {
+    private final String username;
+    private final char[] password;
+    private final String instanceName;
+    private final String zookeepers;
+
+    /**
+     * Constructs an instance of {@link AccumuloConnectionDetails}.
+     *
+     * @param username - The username that was used to establish the 
connection. (not null)
+     * @param password - The password that was used to establish the 
connection. (not null)
+     * @param instanceName - The Accumulo instance name that was used to 
establish the connection. (not null)
+     * @param zookeepers - The list of zookeeper hostname that were used to 
establish the connection. (not null)
+     */
+    public AccumuloConnectionDetails(
+            final String username,
+            final char[] password,
+            final String instanceName,
+            final String zookeepers) {
+        this.username = requireNonNull(username);
+        this.password = requireNonNull(password);
+        this.instanceName = requireNonNull(instanceName);
+        this.zookeepers = requireNonNull(zookeepers);
+    }
+
+    /**
+     * @return The username that was used to establish the connection.
+     */
+    public String getUsername() {
+        return username;
+    }
+
+    /**
+     * @return The password that was used to establish the connection.
+     */
+    public char[] getPassword() {
+        return password;
+    }
+
+    /**
+     * @return The Accumulo instance name that was used to establish the 
connection.
+     */
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    /**
+     * @return The list of zookeeper hostname that were used to establish the 
connection.
+     */
+    public String getZookeepers() {
+        return zookeepers;
+    }
+}
\ No newline at end of file

Reply via email to