http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.cloudbase/pom.xml
----------------------------------------------------------------------
diff --git a/extras/generic.mr/generic.mr.cloudbase/pom.xml 
b/extras/generic.mr/generic.mr.cloudbase/pom.xml
new file mode 100644
index 0000000..bae1b9f
--- /dev/null
+++ b/extras/generic.mr/generic.mr.cloudbase/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>generic.mr</artifactId>
+        <version>3.2.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>generic.mr.cloudbase</artifactId>
+    <name>${project.groupId}.${project.artifactId}</name>
+    <dependencies>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>generic.mr.api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>cloudbase</groupId>
+            <artifactId>cloudbase-core</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.gmaven</groupId>
+                <artifactId>gmaven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.cloudbase/src/main/groovy/mvm/rya/generic/mr/cloudbase/CloudbaseMRInfo.groovy
----------------------------------------------------------------------
diff --git 
a/extras/generic.mr/generic.mr.cloudbase/src/main/groovy/mvm/rya/generic/mr/cloudbase/CloudbaseMRInfo.groovy
 
b/extras/generic.mr/generic.mr.cloudbase/src/main/groovy/mvm/rya/generic/mr/cloudbase/CloudbaseMRInfo.groovy
new file mode 100644
index 0000000..7608fb7
--- /dev/null
+++ 
b/extras/generic.mr/generic.mr.cloudbase/src/main/groovy/mvm/rya/generic/mr/cloudbase/CloudbaseMRInfo.groovy
@@ -0,0 +1,146 @@
+package mvm.rya.generic.mr.cloudbase
+
+import cloudbase.core.client.mapreduce.CloudbaseInputFormat
+import cloudbase.core.client.mapreduce.CloudbaseOutputFormat
+import cloudbase.core.security.Authorizations
+import mvm.rya.generic.mr.api.MRInfo
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapreduce.Job
+import cloudbase.core.data.Mutation
+import cloudbase.core.data.Key
+import cloudbase.core.data.Value
+import cloudbase.core.security.ColumnVisibility
+import cloudbase.core.client.mock.MockInstance
+import cloudbase.core.client.ZooKeeperInstance
+
+/**
+ * Date: 12/3/12
+ * Time: 9:00 AM
+ */
+class CloudbaseMRInfo implements MRInfo {
+
+    def Configuration conf
+    def connector;
+
+    @Override
+    void initMRJob(Job job, String table, String outtable, String[] auths) {
+        Configuration conf = job.configuration
+        String username = conf.get(USERNAME)
+        String password = conf.get(PASSWORD)
+        String instance = conf.get(INSTANCE)
+        String zookeepers = conf.get(ZOOKEEPERS)
+        String mock = conf.get(MOCK)
+
+        //input
+        if (Boolean.parseBoolean(mock)) {
+            CloudbaseInputFormat.setMockInstance(job, instance)
+//            CloudbaseOutputFormat.setMockInstance(conf, instance)
+        } else if (zookeepers != null) {
+            CloudbaseInputFormat.setZooKeeperInstance(job, instance, 
zookeepers)
+            CloudbaseOutputFormat.setZooKeeperInstance(job, instance, 
zookeepers)
+        } else {
+            throw new IllegalArgumentException("Must specify either mock or 
zookeepers");
+        }
+
+        CloudbaseInputFormat.setInputInfo(job, username, password.getBytes(), 
table, new Authorizations(auths))
+        job.setInputFormatClass(CloudbaseInputFormat.class);
+
+        // OUTPUT
+        job.setOutputFormatClass(CloudbaseOutputFormat.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Mutation.class);
+        CloudbaseOutputFormat.setOutputInfo(job, username, 
password.getBytes(), true, outtable);
+    }
+
+    @Override
+    def key(byte[] data) {
+        Key key = new Key();
+        key.readFields(new DataInputStream(new ByteArrayInputStream(data)))
+        return key
+    }
+
+    @Override
+    def key(String row, String cf, String cq, String cv, long timestamp) {
+        return new Key(row, cf, cq, cv, timestamp)
+    }
+
+    @Override
+    def value(byte[] data) {
+        return new Value(data)
+    }
+
+    @Override
+    def columnVisibility(String cv) {
+        return new ColumnVisibility(cv)
+    }
+
+    @Override
+    def mutation(String row, String cf, String cq, String cv, long timestamp, 
byte[] val) {
+        Mutation mutation = new Mutation(row);
+        mutation.put(cf, cq, columnVisibility(cv), timestamp, value(val))
+        return mutation
+    }
+
+    @Override
+    def instance() {
+        assert conf != null
+
+        String instance_str = conf.get(INSTANCE)
+        String zookeepers = conf.get(ZOOKEEPERS)
+        String mock = conf.get(MOCK)
+        if (Boolean.parseBoolean(mock)) {
+            return new MockInstance(instance_str)
+        } else if (zookeepers != null) {
+            return new ZooKeeperInstance(instance_str, zookeepers)
+        } else {
+            throw new IllegalArgumentException("Must specify either mock or 
zookeepers");
+        }
+    }
+
+    @Override
+    def connector(def instance) {
+        if (connector != null) return connector
+
+        String username = conf.get(USERNAME)
+        String password = conf.get(PASSWORD)
+        if (instance == null)
+            instance = instance()
+        connector = instance.getConnector(username, password)
+        return connector
+    }
+
+    @Override
+    def void writeMutations(def connector, String tableName, Iterator 
mutations) {
+        def bw = connector.createBatchWriter(tableName, 10000l, 10000l, 4);
+        mutations.each { m ->
+            bw.addMutation(m)
+        }
+        bw.flush()
+        bw.close()
+    }
+
+    @Override
+    def scanner(def connector, String tableName, String[] auths) {
+        return connector.createScanner(tableName, new Authorizations(auths))
+    }
+
+    @Override
+    def batchScanner(def connector, String tableName, String[] auths, int 
numThreads) {
+        return connector.createBatchScanner(tableName, new 
Authorizations(auths), numThreads)
+    }
+
+    @Override
+    def range(def startKey, def endKey) {
+        assert startKey != null
+
+        if (endKey != null)
+            return new cloudbase.core.data.Range(startKey, endKey)
+        return new cloudbase.core.data.Range(startKey)
+    }
+
+    @Override
+    def authorizations(String[] auths) {
+        return new Authorizations(auths)
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/generic.mr.cloudbase/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo
----------------------------------------------------------------------
diff --git 
a/extras/generic.mr/generic.mr.cloudbase/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo
 
b/extras/generic.mr/generic.mr.cloudbase/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo
new file mode 100644
index 0000000..728f9dd
--- /dev/null
+++ 
b/extras/generic.mr/generic.mr.cloudbase/src/main/resources/META-INF/services/mvm.rya.generic.mr.api.MRInfo
@@ -0,0 +1 @@
+mvm.rya.generic.mr.cloudbase.CloudbaseMRInfo
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/generic.mr/pom.xml
----------------------------------------------------------------------
diff --git a/extras/generic.mr/pom.xml b/extras/generic.mr/pom.xml
new file mode 100644
index 0000000..2a277d2
--- /dev/null
+++ b/extras/generic.mr/pom.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="utf-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>rya.extras</artifactId>
+        <version>3.2.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>generic.mr</artifactId>
+    <name>${project.groupId}.${project.artifactId}</name>
+    <packaging>pom</packaging>
+    <modules>
+        <module>generic.mr.api</module>
+        <module>generic.mr.accumulo</module>
+    </modules>
+
+    <profiles>
+        <profile>
+            <id>cloudbase</id>
+            <modules>
+                <module>generic.mr.cloudbase</module>
+            </modules>
+        </profile>
+    </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
new file mode 100644
index 0000000..7977b20
--- /dev/null
+++ b/extras/indexing/pom.xml
@@ -0,0 +1,201 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>mvm.rya</groupId>
+        <artifactId>rya.extras</artifactId>
+        <version>3.2.9</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <name>${project.groupId}.${project.artifactId}</name>
+    <artifactId>rya.indexing</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>rya.sail.impl</artifactId>
+            <exclusions>
+              <exclusion>
+                <artifactId>hsqldb</artifactId>
+                <groupId>hsqldb</groupId>
+              </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>accumulo.rya</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>mongodb.rya</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-core</artifactId>
+            <version>3.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-analyzers</artifactId>
+            <version>3.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-analyzers</artifactId>
+            <version>3.6.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.4</version>
+        </dependency>
+
+        <!-- I was having issues with hadoop conf, but adding xerces and xalan 
fixed it -->
+
+        <dependency>
+            <groupId>xerces</groupId>
+            <artifactId>xercesImpl</artifactId>
+            <version>2.9.1</version>
+        </dependency>
+        <dependency>
+            <groupId>xalan</groupId>
+            <artifactId>xalan</artifactId>
+            <version>2.7.1</version>
+        </dependency>
+
+        <!-- Geo Indexing -->
+        <dependency>
+            <groupId>org.locationtech.geomesa</groupId>
+            <artifactId>geomesa-accumulo-datastore</artifactId>
+            <version>${geomesa.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>mvm.rya</groupId>
+            <artifactId>rya.prospector</artifactId>
+        </dependency>
+
+      <dependency>
+        <groupId>org.apache.accumulo</groupId>
+        <artifactId>accumulo-core</artifactId>
+      </dependency>
+         <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>2.13.0-rc0</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <configuration>
+                        <shadedArtifactAttached>true</shadedArtifactAttached>
+                            
<shadedClassifierName>map-reduce</shadedClassifierName>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                     <execution>
+                        <id>accumulo-server</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                        <shadedArtifactAttached>true</shadedArtifactAttached>
+                            
<shadedClassifierName>accumulo-server</shadedClassifierName>
+                            <artifactSet>
+                                <excludes>
+                                  <exclude>org.locationtech.geomesa:*</exclude>
+                                  <exclude>scala:*</exclude>
+                                  <exclude>org.apache.accumulo:*</exclude>
+                                  <exclude>org.apache.thrift:*</exclude>
+                                  <exclude>org.apache.hadoop:*</exclude>
+                                  <exclude>org.apache.zookeeper:*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+    <profiles>
+        <profile>
+            <id>accumulo</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.accumulo</groupId>
+                    <artifactId>accumulo-core</artifactId>
+                    <optional>true</optional>
+                </dependency>
+                <dependency>
+                    <groupId>mvm.rya</groupId>
+                    <artifactId>accumulo.iterators</artifactId>
+                    <optional>true</optional>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>cloudbase</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>com.texeltek</groupId>
+                    <artifactId>accumulo-cloudbase-shim</artifactId>
+                    <optional>true</optional>
+                </dependency>
+                <dependency>
+                    <groupId>mvm.rya</groupId>
+                    <artifactId>cloudbase.iterators</artifactId>
+                    <optional>true</optional>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>mr</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+        </profile>
+    </profiles>
+
+
+    <repositories>
+        <repository>
+            <id>cloudera</id>
+            
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+        </repository>
+        <repository>
+            <id>public.opensahara.com</id>
+            <url>http://dev.opensahara.com/nexus/content/groups/public/</url>
+        </repository>
+        <repository>
+            <id>geotools</id>
+            <url>http://download.osgeo.org/webdav/geotools/</url>
+        </repository>
+    </repositories>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java
 
b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java
new file mode 100644
index 0000000..b6063ca
--- /dev/null
+++ 
b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java
@@ -0,0 +1,11 @@
+package mvm.rya.accumulo.documentIndex;
+
+public class DocIndexIteratorUtil {
+
+    
+    
+    public static final String DOC_ID_INDEX_DELIM = "\u001D" + "\u001E";
+    
+    
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
 
b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
new file mode 100644
index 0000000..ad38b2b
--- /dev/null
+++ 
b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java
@@ -0,0 +1,850 @@
+package mvm.rya.accumulo.documentIndex;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.data.ArrayByteSequence;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+
+/**
+ * This iterator facilitates document-partitioned indexing. It involves 
grouping a set of documents together and indexing those documents into a single 
row of
+ * an Accumulo table. This allows a tablet server to perform boolean AND 
operations on terms in the index.
+ * 
+ * The table structure should have the following form:
+ * 
+ * row: shardID, colfam: term, colqual: docID
+ * 
+ * When you configure this iterator with a set of terms (column families), it 
will return only the docIDs that appear with all of the specified terms. The
+ * result will have an empty column family, as follows:
+ * 
+ * row: shardID, colfam: (empty), colqual: docID
+ * 
+ * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to 
parallelize the search over all shardIDs.
+ * 
+ * This iterator will *ignore* any columnFamilies passed to {@link 
#seek(Range, Collection, boolean)} as it performs intersections over terms. 
Extending classes
+ * should override the {@link TermSource#seekColfams} in their 
implementation's {@link #init(SortedKeyValueIterator, Map, 
IteratorEnvironment)} method.
+ * 
+ * README.shard in docs/examples shows an example of using the 
IntersectingIterator.
+ */
+public class DocumentIndexIntersectingIterator implements 
SortedKeyValueIterator<Key,Value> {
+  
+    
+    
+    
+  protected Text nullText = new Text();
+  
+  protected Text getRow(Key key) {
+    return key.getRow();
+  }
+  
+  protected Text getTerm(Key key) {
+    return key.getColumnFamily();
+  }
+  
+  protected Text getTermCond(Key key) {
+    return key.getColumnQualifier();
+  }
+  
+  protected Key buildKey(Text row, TextColumn column) {
+      return new Key(row, (column.getColumnFamily() == null) ? nullText: 
column.getColumnFamily(), column.getColumnQualifier());
+  }
+  
+  protected Key buildKey(Text row, Text term) {
+    return new Key(row, (term == null) ? nullText : term);
+  }
+  
+  protected Key buildKey(Text row, Text term, Text termCond) {
+    return new Key(row, (term == null) ? nullText : term, termCond);
+  }
+  
+  protected Key buildFollowRowKey(Key key, Text term, Text termCond) {
+    return new Key(getRow(key.followingKey(PartialKey.ROW)),(term == null) ? 
nullText : term, termCond);
+  }
+  
+    protected static final Logger log = 
Logger.getLogger(DocumentIndexIntersectingIterator.class);
+
+    public static class TermSource {
+        public SortedKeyValueIterator<Key, Value> iter;
+        public Text term;
+        public Text termCond;
+        public Collection<ByteSequence> seekColfams;
+        public TextColumn column;
+        public boolean isPrefix;
+        public Key top ;
+        public Key next ;
+        public Text currentCQ;
+        private boolean seeked = false;
+
+        public TermSource(TermSource other) {
+         
+            this.iter = other.iter;
+            this.term = other.term;
+            this.termCond = other.termCond;
+            this.seekColfams = other.seekColfams;
+            this.column = other.column;
+            this.top = other.top;
+            this.next = other.next;
+            this.currentCQ = other.currentCQ;
+            this.isPrefix = other.isPrefix;
+        }
+
+
+        public TermSource(SortedKeyValueIterator<Key, Value> iter, TextColumn 
column) {
+           
+            this.iter = iter;
+            this.column = column;
+            this.term = column.getColumnFamily();
+            this.termCond = column.getColumnQualifier();
+            this.currentCQ = new Text(emptyByteArray);
+            this.seekColfams = Collections.<ByteSequence> singletonList(new 
ArrayByteSequence(term
+                    .getBytes(), 0, term.getLength()));
+           
+        }
+        
+        
+        
+        public void seek(Range r) throws IOException {
+
+            if (seeked) {
+ 
+                if (next != null && !r.beforeStartKey(next)) {
+                    if (next.getColumnFamily().equals(term)) {
+                        this.updateTop();
+                    }
+                } else if (iter.hasTop()) {
+                    iter.seek(r, seekColfams, true);
+                    this.updateTopNext();
+                } else {
+                    top = null;
+                    next = null;
+                
+                }
+            } else {
+
+                iter.seek(r, seekColfams, true);
+                this.updateTopNext();
+                seeked = true;
+            }
+
+        }
+        
+        
+        public void next() throws IOException {
+
+            this.updateTop();
+        }
+        
+        public void updateTop() throws IOException {
+
+            top = next;
+            if (next != null) {
+                iter.next();
+                if (iter.hasTop()) {
+                    next = iter.getTopKey();
+                } else {
+                    next = null;
+                }
+            }
+
+        }
+        
+        public void updateTopNext() throws IOException {
+
+            if (iter.hasTop()) {
+                top = iter.getTopKey();
+            } else {
+                top = null;
+                next = null;
+                return;
+            }
+            
+            iter.next();
+            
+            if(iter.hasTop()) {
+                next = iter.getTopKey();
+            } else {
+                next = null;
+            }
+        }
+        
+        public boolean hasTop() {
+            return top != null;
+        }
+        
+
+        public String getTermString() {
+            return (this.term == null) ? new String("Iterator") : 
this.term.toString();
+        }
+    }
+  
+  TermSource[] sources;
+  int sourcesCount = 0;
+  Range overallRange;
+  
+  // query-time settings
+  protected Text currentRow = null;
+  protected Text currentTermCond = new Text(emptyByteArray);
+  static final byte[] emptyByteArray = new byte[0];
+  
+  protected Key topKey = null;
+  protected Value value = new Value(emptyByteArray);
+  protected String ctxt = null;
+  protected boolean hasContext = false;
+  protected boolean termCondSet = false;
+  
+  public DocumentIndexIntersectingIterator() {}
+  
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+      //log.info("Calling deep copy on " + this);
+    return new DocumentIndexIntersectingIterator(this, env);
+  }
+  
+  private DocumentIndexIntersectingIterator(DocumentIndexIntersectingIterator 
other, IteratorEnvironment env) {
+    if (other.sources != null) {
+      sourcesCount = other.sourcesCount;
+      sources = new TermSource[sourcesCount];
+      for (int i = 0; i < sourcesCount; i++) {
+        sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), 
other.sources[i].column);
+      }
+    }
+  }
+  
+  @Override
+  public Key getTopKey() {
+
+    return topKey;
+  }
+  
+  @Override
+  public Value getTopValue() {
+    // we don't really care about values
+    return value;
+  }
+  
+  @Override
+  public boolean hasTop() {
+    return currentRow != null;
+  }
+
+    // precondition: currentRow is not null
+    private boolean seekOneSource(int sourceID) throws IOException {
+        // find the next key in the appropriate column family that is at or
+        // beyond the cursor (currentRow, currentCQ)
+        // advance the cursor if this source goes beyond it
+        // return whether we advanced the cursor
+
+        // within this loop progress must be made in one of the following 
forms:
+        // - currentRow or currentCQ must be increased
+        // - the given source must advance its iterator
+        // this loop will end when any of the following criteria are met
+        // - the iterator for the given source is pointing to the key
+        // (currentRow, columnFamilies[sourceID], currentCQ)
+        // - the given source is out of data and currentRow is set to null
+        // - the given source has advanced beyond the endRow and currentRow is
+        // set to null
+        boolean advancedCursor = false;
+        
+      
+        
+        
+
+        while (true) {
+            
+//            if(currentRow.toString().equals(s)) {
+//                log.info("Source id is " + sourceID);
+//                if (sources[sourceID].top != null) {
+//                    log.info("Top row is " + getRow(sources[sourceID].top));
+//                    log.info("Top cq is " + 
getTermCond(sources[sourceID].top));
+//                }
+//                if (sources[sourceID].next != null) {
+//                    log.info("Next row is " + 
getRow(sources[sourceID].next));
+//                    log.info("Next termCond is " + 
getTermCond(sources[sourceID].next));
+//                }
+//            }
+            
+            if (sources[sourceID].hasTop() == false) {
+                currentRow = null;
+                // setting currentRow to null counts as advancing the cursor
+                return true;
+            }
+            // check if we're past the end key
+            int endCompare = -1;
+            // we should compare the row to the end of the range
+
+            if (overallRange.getEndKey() != null) {
+                endCompare = 
overallRange.getEndKey().getRow().compareTo(sources[sourceID].top.getRow());
+                if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || 
endCompare < 0) {
+                    currentRow = null;
+                    // setting currentRow to null counts as advancing the 
cursor
+                    return true;
+                }
+            }
+            
+
+            
+            int rowCompare = 
currentRow.compareTo(getRow(sources[sourceID].top));
+            // check if this source is already at or beyond currentRow
+            // if not, then seek to at least the current row
+            
+        
+            
+            if (rowCompare > 0) {
+                // seek to at least the currentRow
+                Key seekKey = buildKey(currentRow, sources[sourceID].term);
+                sources[sourceID].seek(new Range(seekKey, true, null, false));
+              
+                continue;
+            }
+            // check if this source has gone beyond currentRow
+            // if so, advance currentRow
+            if (rowCompare < 0) {
+                currentRow.set(getRow(sources[sourceID].top));
+                //log.info("Current row is " + currentRow);
+                advancedCursor = true;
+                continue;
+            }
+            // we have verified that the current source is positioned in
+            // currentRow
+            // now we must make sure we're in the right columnFamily in the
+            // current row
+            // Note: Iterators are auto-magically set to the correct
+            // columnFamily
+
+            if (sources[sourceID].column.isValid()) {
+                
+                boolean isPrefix = false;
+                boolean contextEqual = false;
+                String tempContext = "";
+                
+                int termCompare;
+
+                String[] cQ = 
getTermCond(sources[sourceID].top).toString().split("\u0000");
+                tempContext = cQ[0];
+
+                if (!hasContext && ctxt == null) {
+                    ctxt = cQ[0];
+                }
+
+                contextEqual = ctxt.equals(cQ[0]);
+
+                String s1 = sources[sourceID].termCond.toString();
+                String s2 = cQ[1] + "\u0000" + cQ[2];
+
+                if (sources[sourceID].isPrefix) {
+                    isPrefix = s2.startsWith(s1 + "\u0000");
+                } else {
+                    isPrefix = s2.startsWith(s1);
+                }
+
+                termCompare = (contextEqual && isPrefix) ? 0 : (ctxt + 
"\u0000" + s1).compareTo(cQ[0] + "\u0000" + s2);
+
+                // if(currentRow.toString().equals(s)) {
+                // log.info("Term compare is " + termCompare);
+                // }
+           
+                // check if this source is already on the right columnFamily
+                // if not, then seek forwards to the right columnFamily
+                if (termCompare > 0) {
+                    Key seekKey = buildKey(currentRow, sources[sourceID].term, 
new Text(ctxt + 
+                            "\u0000" + sources[sourceID].termCond.toString()));
+                    sources[sourceID].seek(new Range(seekKey, true, null, 
false));
+                 
+                    continue;
+                }
+                // check if this source is beyond the right columnFamily
+                // if so, then seek to the next row
+                if (termCompare < 0) {
+                    // we're out of entries in the current row, so seek to the
+                    // next one
+                    
+                    if (endCompare == 0) {
+                        // we're done
+                        currentRow = null;
+                        // setting currentRow to null counts as advancing the
+                        // cursor
+                        return true;
+                    }
+                    
+                    
+                    
+                    //advance to next row if context set - all entries in 
given row exhausted
+                    if (hasContext || tempContext.length() == 0) {
+                        Key seekKey = buildFollowRowKey(sources[sourceID].top, 
sources[sourceID].term,
+                                new Text(ctxt + "\u0000" + 
sources[sourceID].termCond.toString()));
+                        sources[sourceID].seek(new Range(seekKey, true, null, 
false));
+                    } else {
+                        
+                        if(contextEqual && !isPrefix) {
+                            Key seekKey = buildKey(currentRow, 
sources[sourceID].term, new Text(ctxt + "\u0001"));
+                            sources[sourceID].seek(new Range(seekKey, true, 
null, false));
+                            if(sources[sourceID].top != null) {
+                                ctxt = 
getTermCond(sources[sourceID].top).toString().split("\u0000")[0];
+                            } 
+                        } else {
+                            Key seekKey = buildKey(currentRow, 
sources[sourceID].term, new Text(tempContext + 
+                                    "\u0000" + 
sources[sourceID].termCond.toString()));
+                            sources[sourceID].seek(new Range(seekKey, true, 
null, false));
+                            if(sources[sourceID].top != null) {
+                                ctxt = 
getTermCond(sources[sourceID].top).toString().split("\u0000")[0];
+                            } 
+                        }
+                        
+                    }
+                    
+                    
+//                    if(currentRow.toString().equals(s)) {
+//                        log.info("current term cond is " + currentTermCond);
+//                        
+//                    }
+                    
+             
+                    continue;
+                }
+            }
+         
+            
+            
+         
+            
+            
+            
+            
+            
+            
+            //set currentTermCond -- gets appended to end of currentKey column 
qualifier
+            //used to determine which term iterator to advance when a new 
iterator is created
+            
+            
sources[sourceID].currentCQ.set(getTermCond(sources[sourceID].top));
+            
+            if (sources[sourceID].next != null) {
+                        
+                //is hasContext, only consider sourceID with next having 
designated context
+                //otherwise don't set currentTermCond
+                if (!termCondSet && hasContext) {
+                    if (sources[sourceID].next.getRow().equals(currentRow)
+                            && 
sources[sourceID].next.getColumnQualifier().toString()
+                                    .startsWith(ctxt + "\u0000" + 
sources[sourceID].termCond.toString())) {
+                        currentTermCond.set(new 
Text(Integer.toString(sourceID)));
+                        termCondSet = true;
+                    }
+                } else if(!termCondSet){
+                    String[] cq = 
getTermCond(sources[sourceID].next).toString().split("\u0000");
+                    
+                    //set currentTermCond with preference given to sourceID 
having next with same context
+                    //otherwise set currentTermCond sourceID with next having 
termCond as prefix
+                    if (sources[sourceID].next.getRow().equals(currentRow)) {
+                        if 
(sources[sourceID].next.getColumnQualifier().toString()
+                                .startsWith(ctxt + "\u0000" + 
sources[sourceID].termCond.toString())) {
+                            currentTermCond.set(new 
Text(Integer.toString(sourceID)));
+                            termCondSet = true;
+                        } else if ((cq[1] + "\u0000" + 
cq[2]).startsWith(sources[sourceID].termCond.toString())) {
+                            currentTermCond.set(new 
Text(Integer.toString(sourceID)));
+                        }
+                    }
+                }
+            } 
+       
+           
+            break;
+        }
+
+        return advancedCursor;
+    }
+  
+  @Override
+  public void next() throws IOException {
+    if (currentRow == null) {
+      return;
+    }
+   
+    
+    
+    if(currentTermCond.getLength() != 0) {
+        
+        int id = Integer.parseInt(currentTermCond.toString());
+
+        sources[id].next();
+        currentTermCond.set(emptyByteArray);
+        termCondSet = false;
+        if(sources[id].top != null && !hasContext) {
+            ctxt = getTermCond(sources[id].top).toString().split("\u0000")[0];
+        }
+        advanceToIntersection();
+        return;
+    }
+    
+    sources[0].next();
+    if(sources[0].top != null && !hasContext) {
+        ctxt = getTermCond(sources[0].top).toString().split("\u0000")[0];
+    }
+    advanceToIntersection();
+  }
+  
+  protected void advanceToIntersection() throws IOException {
+    boolean cursorChanged = true;
+    while (cursorChanged) {
+      // seek all of the sources to at least the highest seen column qualifier 
in the current row
+      cursorChanged = false;
+      for (int i = 0; i < sourcesCount; i++) {
+//          log.info("New sourceID is " + i);
+        if (currentRow == null) {
+          topKey = null;
+          return;
+        }
+        if (seekOneSource(i)) {
+          currentTermCond.set(emptyByteArray); 
+          termCondSet = false;
+          cursorChanged = true;
+          break;
+        }
+      }
+    }
+    String cq = "";
+    for(int i = 0; i < sourcesCount; i++) {
+        cq = cq + sources[i].currentCQ.toString() + 
DocIndexIteratorUtil.DOC_ID_INDEX_DELIM;
+    }
+    
+        if (currentTermCond.getLength() == 0) {
+            topKey = buildKey(currentRow, nullText, new Text(cq + -1));
+        } else {
+            topKey = buildKey(currentRow, nullText, new Text(cq + 
currentTermCond.toString()));
+        }
+  }
+  
+  public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) {
+    if (iter.hasTop())
+      return iter.getTopKey().toString();
+    return "";
+  }
+  
+  private static final String columnOptionName = "columns";
+  private static final String columnPrefix = "prefixes";
+  private static final String context = "context";
+  
+  
+  
+  protected static String encodeColumns(TextColumn[] columns) {
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < columns.length; i++) {
+        sb.append(new 
String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnFamily()))));
+        sb.append('\n');
+        sb.append(new 
String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnQualifier()))));
+        sb.append('\u0001');
+      }
+      return sb.toString();
+    }
+    
+  
+  
+  protected static TextColumn[] decodeColumns(String columns) {
+      String[] columnStrings = columns.split("\u0001");
+      TextColumn[] columnTexts = new TextColumn[columnStrings.length];
+      for (int i = 0; i < columnStrings.length; i++) {
+        String[] columnComponents = columnStrings[i].split("\n");
+        columnTexts[i] = new TextColumn(new 
Text(Base64.decodeBase64(columnComponents[0].getBytes())), 
+                new Text(Base64.decodeBase64(columnComponents[1].getBytes())));
+      }
+      return columnTexts;
+    }
+  
+ 
+  
+  
+  
+  /**
+   * @param context
+   * @return encoded context
+   */
+  protected static String encodeContext(String context) {
+ 
+    return new String(Base64.encodeBase64(context.getBytes()));
+  }
+  
+ 
+  
+  /**
+   * @param context
+   * @return decoded context
+   */
+    protected static String decodeContext(String context) {
+
+        if (context == null) {
+            return null;
+        } else {
+            return new String(Base64.decodeBase64(context.getBytes()));
+        }
+    }
+  
+  
+  
+  
+  
+  protected static String encodeBooleans(boolean[] prefixes) {
+      byte[] bytes = new byte[prefixes.length];
+      for (int i = 0; i < prefixes.length; i++) {
+        if (prefixes[i])
+          bytes[i] = 1;
+        else
+          bytes[i] = 0;
+      }
+      return new String(Base64.encodeBase64(bytes));
+    }
+    
+    /**
+     * @param flags
+     * @return decoded flags
+     */
+    protected static boolean[] decodeBooleans(String prefixes) {
+      // return null of there were no flags
+      if (prefixes == null)
+        return null;
+      
+      byte[] bytes = Base64.decodeBase64(prefixes.getBytes());
+      boolean[] bFlags = new boolean[bytes.length];
+      for (int i = 0; i < bytes.length; i++) {
+        if (bytes[i] == 1)
+          bFlags[i] = true;
+        else
+          bFlags[i] = false;
+      }
+      return bFlags;
+    }
+  
+  
+  
+  
+  
+  
+  
+  
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, 
Map<String,String> options, IteratorEnvironment env) throws IOException {
+    TextColumn[] terms = decodeColumns(options.get(columnOptionName));
+    boolean[] prefixes = decodeBooleans(options.get(columnPrefix));
+    ctxt = decodeContext(options.get(context));
+    
+    if(ctxt != null) {
+        hasContext = true;
+    }
+  
+   
+    
+    if (terms.length < 2) {
+      throw new IllegalArgumentException("IntersectionIterator requires two or 
more columns families");
+    }
+    
+    sources = new TermSource[terms.length];
+    sources[0] = new TermSource(source, terms[0]);
+    for (int i = 1; i < terms.length; i++) {
+        //log.info("For decoded column " + i + " column family is " + 
terms[i].getColumnFamily() + " and qualifier is " + 
terms[i].getColumnQualifier());
+      sources[i] = new TermSource(source.deepCopy(env), terms[i]);
+      sources[i].isPrefix = prefixes[i];
+    }
+    sourcesCount = terms.length;
+  }
+  
+    @Override
+    public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, 
boolean inclusive) throws IOException {
+        overallRange = new Range(range);
+        currentRow = new Text();
+        currentTermCond.set(emptyByteArray);
+        termCondSet = false;
+
+
+        
+//       log.info("Calling seek with range " + range);
+
+        // seek each of the sources to the right column family within the row
+        // given by key
+       
+        Key sourceKey;
+
+        if (rangeCqValid(range)) {
+            
+            String[] cqInfo = 
cqParser(range.getStartKey().getColumnQualifier());
+            int id = Integer.parseInt(cqInfo[1]);
+            
+
+            
+            if (id >= 0) {
+                for (int i = 0; i < sourcesCount; i++) {
+
+                    if (i == id) {
+                        sourceKey = buildKey(getRow(range.getStartKey()), 
sources[i].term, new Text(cqInfo[0]));
+                        sources[i].seek(new Range(sourceKey, true, null, 
false));
+                        sources[i].next();
+                        if(!hasContext && sources[i].hasTop()) {
+                            ctxt = 
getTermCond(sources[i].top).toString().split("\u0000")[0];
+                        }
+                    } else {
+                        sourceKey = buildKey(getRow(range.getStartKey()), 
sources[i].term);
+                        sources[i].seek(new Range(sourceKey, true, null, 
false));
+                    }
+                }
+            } else {
+                
+
+                for (int i = 0; i < sourcesCount; i++) {
+                    sourceKey = buildKey(getRow(range.getStartKey()), 
sources[i].term, range.getStartKey()
+                            .getColumnQualifier());
+                    sources[i].seek(new Range(sourceKey, true, null, false));
+                }
+            }
+                
+            
+        } else {
+            
+//            log.info("Range is invalid.");
+            for (int i = 0; i < sourcesCount; i++) {
+
+                if (range.getStartKey() != null) {
+
+                    sourceKey = buildKey(getRow(range.getStartKey()), 
sources[i].term);
+
+                    // Seek only to the term for this source as a column family
+                    sources[i].seek(new Range(sourceKey, true, null, false));
+                } else {
+                    // Seek only to the term for this source as a column family
+
+                    sources[i].seek(range);
+                }
+            }
+        }
+        
+        advanceToIntersection();
+
+    }
+    
+    
+    private String[] cqParser(Text cq) {
+        
+        String cQ = cq.toString();
+        String[] cqComponents = 
cQ.split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM);
+        int id = -1;
+        String[] valPos = new String[2];
+        
+
+        
+        
+        if(cqComponents.length > 1) {
+            id = Integer.parseInt(cqComponents[cqComponents.length-1]);
+            if (id >= 0) {
+                valPos[0] = cqComponents[id].toString();
+                valPos[1] = "" + id;
+            } else {
+                valPos[0] = cqComponents[0].toString();
+                valPos[1] = "" + id;
+            }
+        } else {
+            valPos[0] = cq.toString();
+            valPos[1] = "" + -1;
+        }
+        
+        return valPos;
+       
+    }
+    
+  
+  private boolean rangeCqValid(Range range) {
+      return (range.getStartKey() != null) && 
(range.getStartKey().getColumnQualifier() != null);
+  }
+  
+  
+  
+  public void addSource(SortedKeyValueIterator<Key,Value> source, 
IteratorEnvironment env, TextColumn column) {
+    // Check if we have space for the added Source
+    if (sources == null) {
+      sources = new TermSource[1];
+    } else {
+      // allocate space for node, and copy current tree.
+      // TODO: Should we change this to an ArrayList so that we can just add() 
? - ACCUMULO-1309
+      TermSource[] localSources = new TermSource[sources.length + 1];
+      int currSource = 0;
+      for (TermSource myTerm : sources) {
+        // TODO: Do I need to call new here? or can I just re-use the term? - 
ACCUMULO-1309
+        localSources[currSource] = new TermSource(myTerm);
+        currSource++;
+      }
+      sources = localSources;
+    }
+    sources[sourcesCount] = new TermSource(source.deepCopy(env), column);
+    sourcesCount++;
+  }
+  
+  /**
+   * Encode the columns to be used when iterating.
+   * 
+   * @param cfg
+   * @param columns
+   */
+  public static void setColumnFamilies(IteratorSetting cfg, TextColumn[] 
columns) {
+    if (columns.length < 2)
+      throw new IllegalArgumentException("Must supply at least two terms to 
intersect");
+    
+    boolean[] prefix = new boolean[columns.length];
+    
+    for(int i = 0; i < columns.length; i++) {
+        prefix[i] = columns[i].isPrefix();
+    }
+    
+    
+    
+    cfg.addOption(DocumentIndexIntersectingIterator.columnPrefix, 
DocumentIndexIntersectingIterator.encodeBooleans(prefix));
+    cfg.addOption(DocumentIndexIntersectingIterator.columnOptionName, 
DocumentIndexIntersectingIterator.encodeColumns(columns));
+  }
+  
+  
+  
+  
+  
+  public static void setContext(IteratorSetting cfg, String context) {
+     
+      cfg.addOption(DocumentIndexIntersectingIterator.context, 
DocumentIndexIntersectingIterator.encodeContext(context));
+      
+    }
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java 
b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java
new file mode 100644
index 0000000..9747f60
--- /dev/null
+++ 
b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java
@@ -0,0 +1,88 @@
+package mvm.rya.accumulo.documentIndex;
+
+import org.apache.hadoop.io.Text;
+
+public class TextColumn  {
+    
+    
+    private Text columnFamily;
+    private Text columnQualifier;
+    private boolean isPrefix = false;
+    
+    
+    
+    public TextColumn(Text columnFamily, Text columnQualifier) {
+        this.columnFamily = columnFamily;
+        this.columnQualifier = columnQualifier;
+    }
+    
+    
+    public TextColumn(TextColumn other) {
+        
+        this.columnFamily = new Text(other.columnFamily);
+        this.columnQualifier = new Text(other.columnQualifier);
+        this.isPrefix = other.isPrefix;
+      
+    }
+    
+    
+    public Text getColumnFamily() {
+        return columnFamily;
+    }
+    
+    
+    public boolean isPrefix() {
+        return isPrefix;
+    }
+    
+    
+    public void setIsPrefix(boolean isPrefix) {
+        this.isPrefix = isPrefix;
+    }
+    
+    
+    public boolean isValid() {
+        return (columnFamily != null && columnQualifier != null);
+    }
+    
+    
+    
+    public Text getColumnQualifier() {
+        return columnQualifier;
+    }
+    
+    
+    public void setColumnFamily(Text cf) {
+        this.columnFamily = cf;
+    }
+    
+    public void setColumnQualifier(Text cq) {
+        this.columnQualifier = cq;
+    }
+    
+    public String toString() {
+        
+        return columnFamily.toString() + ",  " + columnQualifier.toString() + 
",    prefix:" + isPrefix;
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        
+        if(other == null) {
+            return false;
+        }
+        
+        if(!(other instanceof TextColumn)) {
+            return false;
+        }
+        
+        TextColumn tc = (TextColumn) other;
+        
+        return this.columnFamily.equals(tc.columnFamily) && 
this.columnQualifier.equals(tc.columnQualifier) && this.isPrefix == tc.isPrefix;
+        
+        
+        
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java 
b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java
new file mode 100644
index 0000000..11d58c5
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullFreeTextIndexer.java
@@ -0,0 +1,69 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * #%L
+ * mvm.rya.indexing.accumulo
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.io.IOException;
+import java.util.Set;
+
+import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.indexing.FreeTextIndexer;
+import mvm.rya.indexing.StatementContraints;
+
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.QueryEvaluationException;
+
+public class NullFreeTextIndexer extends AbstractAccumuloIndexer implements 
FreeTextIndexer {
+
+    @Override
+    public String getTableName() {
+        return null;
+    }
+
+    @Override
+    public void storeStatement(RyaStatement statement) throws IOException {
+    }
+
+    @Override
+    public Configuration getConf() {
+        return null;
+    }
+
+    @Override
+    public void setConf(Configuration arg0) {
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryText(String query, StatementContraints contraints)
+            throws IOException {
+        return null;
+    }
+
+    @Override
+    public Set<URI> getIndexablePredicates() {
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java 
b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java
new file mode 100644
index 0000000..0629adb
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullGeoIndexer.java
@@ -0,0 +1,120 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * #%L
+ * mvm.rya.indexing.accumulo
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.io.IOException;
+import java.util.Set;
+
+import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.indexing.GeoIndexer;
+import mvm.rya.indexing.StatementContraints;
+
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.QueryEvaluationException;
+
+import com.vividsolutions.jts.geom.Geometry;
+
+public class NullGeoIndexer extends AbstractAccumuloIndexer implements 
GeoIndexer {
+
+    @Override
+    public String getTableName() {
+
+        return null;
+    }
+
+    @Override
+    public void storeStatement(RyaStatement statement) throws IOException {
+
+        
+    }
+
+    @Override
+    public Configuration getConf() {
+
+        return null;
+    }
+
+    @Override
+    public void setConf(Configuration arg0) {
+
+        
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryEquals(Geometry query, StatementContraints contraints) {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryDisjoint(Geometry query, StatementContraints contraints) {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryIntersects(Geometry query, StatementContraints contraints) {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryTouches(Geometry query, StatementContraints contraints) {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryCrosses(Geometry query, StatementContraints contraints) {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryWithin(Geometry query, StatementContraints contraints) {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryContains(Geometry query, StatementContraints contraints) {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryOverlaps(Geometry query, StatementContraints contraints) {
+
+        return null;
+    }
+
+    @Override
+    public Set<URI> getIndexablePredicates() {
+
+        return null;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java 
b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java
new file mode 100644
index 0000000..cf83178
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/NullTemporalIndexer.java
@@ -0,0 +1,134 @@
+package mvm.rya.accumulo.mr;
+
+import info.aduna.iteration.CloseableIteration;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import mvm.rya.accumulo.experimental.AbstractAccumuloIndexer;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.indexing.StatementContraints;
+import mvm.rya.indexing.TemporalIndexer;
+import mvm.rya.indexing.TemporalInstant;
+import mvm.rya.indexing.TemporalInterval;
+
+import org.apache.hadoop.conf.Configuration;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.query.QueryEvaluationException;
+
+/**
+ * Temporal Indexer that does nothing, like when disabled.
+ *
+ */
+public class NullTemporalIndexer extends AbstractAccumuloIndexer implements 
TemporalIndexer {
+
+    @Override
+    public String getTableName() {
+
+        return null;
+    }
+
+    @Override
+    public void storeStatement(RyaStatement statement) throws IOException {
+
+        
+    }
+
+    @Override
+    public Configuration getConf() {
+
+        return null;
+    }
+
+    @Override
+    public void setConf(Configuration arg0) {
+
+        
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryInstantEqualsInstant(TemporalInstant queryInstant,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryInstantBeforeInstant(TemporalInstant queryInstant,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryInstantAfterInstant(TemporalInstant queryInstant,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryInstantBeforeInterval(TemporalInterval givenInterval,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryInstantAfterInterval(TemporalInterval givenInterval,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryInstantInsideInterval(TemporalInterval givenInterval,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryInstantHasBeginningInterval(TemporalInterval queryInterval,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryInstantHasEndInterval(TemporalInterval queryInterval,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryIntervalEquals(TemporalInterval query,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryIntervalBefore(TemporalInterval query,
+            StatementContraints contraints) throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public CloseableIteration<Statement, QueryEvaluationException> 
queryIntervalAfter(TemporalInterval query, StatementContraints contraints)
+            throws QueryEvaluationException {
+
+        return null;
+    }
+
+    @Override
+    public Set<URI> getIndexablePredicates() {
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java 
b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java
new file mode 100644
index 0000000..1e26626
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/RyaOutputFormat.java
@@ -0,0 +1,328 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * #%L
+ * mvm.rya.indexing.accumulo
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.indexing.FreeTextIndexer;
+import mvm.rya.indexing.GeoIndexer;
+import mvm.rya.indexing.TemporalIndexer;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.accumulo.StatementSerializer;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+import mvm.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.log4j.Logger;
+import org.geotools.feature.SchemaException;
+import org.openrdf.model.Statement;
+
+/**
+ * Hadoop Map/Reduce class to use Rya, the {@link GeoIndexer}, the {@link 
FreeTextIndexer}, and the {@link TemporalIndexer} as the sink of {@link 
Statement} data.
+ * wrapped in an {@link StatementWritable} objects. This {@link OutputFormat} 
ignores the Keys and only writes the Values to Rya.
+ * 
+ * The user must specify connection parameters for Rya, {@link GeoIndexer}, 
{@link FreeTextIndexer}, and {@link TemporalIndexer}.
+ */
+public class RyaOutputFormat extends OutputFormat<Writable, StatementWritable> 
{
+    private static final Logger logger = 
Logger.getLogger(RyaOutputFormat.class);
+
+    private static final String PREFIX = RyaOutputFormat.class.getSimpleName();
+    private static final String MAX_MUTATION_BUFFER_SIZE = PREFIX + 
".maxmemory";
+    private static final String ENABLE_FREETEXT = PREFIX + ".freetext.enable";
+    private static final String ENABLE_GEO = PREFIX + ".geo.enable";
+    private static final String ENABLE_TEMPORAL = PREFIX + ".temporal.enable";;
+
+
+    @Override
+    public void checkOutputSpecs(JobContext jobContext) throws IOException, 
InterruptedException {
+        Configuration conf = jobContext.getConfiguration();
+
+        // make sure that all of the indexers can connect
+        getGeoIndexer(conf);
+        getFreeTextIndexer(conf);
+        getTemporalIndexer(conf);
+        getRyaIndexer(conf);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
throws IOException, InterruptedException {
+        // copied from AccumuloOutputFormat
+        return new NullOutputFormat<Text, 
Mutation>().getOutputCommitter(context);
+    }
+
+    @Override
+    public RecordWriter<Writable, StatementWritable> 
getRecordWriter(TaskAttemptContext context) throws IOException, 
InterruptedException {
+        return new RyaRecordWriter(context);
+    }
+
+    private static GeoIndexer getGeoIndexer(Configuration conf) throws 
IOException {
+        if (!conf.getBoolean(ENABLE_GEO, true)) {
+            return new NullGeoIndexer();
+        }
+
+        GeoMesaGeoIndexer geo = new GeoMesaGeoIndexer();
+        geo.setConf(conf);
+        return geo;
+
+    }
+
+    private static FreeTextIndexer getFreeTextIndexer(Configuration conf) 
throws IOException {
+        if (!conf.getBoolean(ENABLE_FREETEXT, true)) {
+            return new NullFreeTextIndexer();
+        }
+
+        AccumuloFreeTextIndexer freeText = new AccumuloFreeTextIndexer();
+        freeText.setConf(conf);
+        return freeText;
+
+    }
+
+    private static TemporalIndexer getTemporalIndexer(Configuration conf) 
throws IOException {
+        if (!conf.getBoolean(ENABLE_TEMPORAL, true)) {
+            return new NullTemporalIndexer();
+        }
+        AccumuloTemporalIndexer temporal = new AccumuloTemporalIndexer();
+        temporal.setConf(conf);
+        return temporal;
+    }
+
+    private static AccumuloRyaDAO getRyaIndexer(Configuration conf) throws 
IOException {
+        try {
+            AccumuloRyaDAO ryaIndexer = new AccumuloRyaDAO();
+            Connector conn = ConfigUtils.getConnector(conf);
+            ryaIndexer.setConnector(conn);
+
+            AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+
+            String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
+            if (tablePrefix != null) {
+                ryaConf.setTablePrefix(tablePrefix);
+            }
+            ryaConf.setDisplayQueryPlan(false);
+            ryaIndexer.setConf(ryaConf);
+            ryaIndexer.init();
+            return ryaIndexer;
+        } catch (AccumuloException e) {
+            logger.error("Cannot create RyaIndexer", e);
+            throw new IOException(e);
+        } catch (AccumuloSecurityException e) {
+            logger.error("Cannot create RyaIndexer", e);
+            throw new IOException(e);
+        } catch (RyaDAOException e) {
+            logger.error("Cannot create RyaIndexer", e);
+            throw new IOException(e);
+        }
+    }
+
+    public static class RyaRecordWriter extends RecordWriter<Writable, 
StatementWritable> implements Closeable, Flushable {
+        private static final Logger logger = 
Logger.getLogger(RyaRecordWriter.class);
+
+        private FreeTextIndexer freeTextIndexer;
+        private GeoIndexer geoIndexer;
+        private TemporalIndexer temporalIndexer;
+        private AccumuloRyaDAO ryaIndexer;
+
+        private static final long ONE_MEGABYTE = 1024L * 1024L;
+        private static final long AVE_STATEMENT_SIZE = 100L;
+
+        private long bufferSizeLimit;
+        private long bufferCurrentSize = 0;
+
+        private ArrayList<RyaStatement> buffer;
+
+        public RyaRecordWriter(TaskAttemptContext context) throws IOException {
+            this(context.getConfiguration());
+        }
+
+        public RyaRecordWriter(Configuration conf) throws IOException {
+            // set up the buffer
+            bufferSizeLimit = conf.getLong(MAX_MUTATION_BUFFER_SIZE, 
ONE_MEGABYTE);
+            int bufferCapacity = (int) (bufferSizeLimit / AVE_STATEMENT_SIZE);
+            buffer = new ArrayList<RyaStatement>(bufferCapacity);
+
+            // set up the indexers
+            freeTextIndexer = getFreeTextIndexer(conf);
+            geoIndexer = getGeoIndexer(conf);
+            temporalIndexer = getTemporalIndexer(conf);
+            ryaIndexer = getRyaIndexer(conf);
+
+            // update fields used for metrics
+            startTime = System.currentTimeMillis();
+            lastCommitFinishTime = startTime;
+        }
+
+        @Override
+        public void flush() throws IOException {
+            flushBuffer();
+        }
+
+        @Override
+        public void close() throws IOException {
+            close(null);
+        }
+
+        @Override
+        public void close(TaskAttemptContext paramTaskAttemptContext) throws 
IOException {
+            // close everything. log errors
+            try {
+                flush();
+            } catch (IOException e) {
+                logger.error("Error flushing the buffer on RyaOutputFormat 
Close", e);
+            }
+            try {
+                if (geoIndexer != null)
+                    geoIndexer.close();
+            } catch (IOException e) {
+                logger.error("Error closing the geoIndexer on RyaOutputFormat 
Close", e);
+            }
+            try {
+                if (freeTextIndexer != null)
+                    freeTextIndexer.close();
+            } catch (IOException e) {
+                logger.error("Error closing the freetextIndexer on 
RyaOutputFormat Close", e);
+            }
+            try {
+                if (temporalIndexer != null)
+                    temporalIndexer.close();
+            } catch (IOException e) {
+                logger.error("Error closing the temporalIndexer on 
RyaOutputFormat Close", e);
+            }
+            try {
+                ryaIndexer.destroy();
+            } catch (RyaDAOException e) {
+                logger.error("Error closing RyaDAO on RyaOutputFormat Close", 
e);
+            }
+        }
+
+        public void write(Statement statement) throws IOException, 
InterruptedException {
+            write(null, new StatementWritable(statement));
+        }
+
+        @Override
+        public void write(Writable key, StatementWritable value) throws 
IOException, InterruptedException {
+            buffer.add(RdfToRyaConversions.convertStatement(value));
+
+            bufferCurrentSize += 
StatementSerializer.writeStatement(value).length();
+
+            if (bufferCurrentSize >= bufferSizeLimit) {
+                flushBuffer();
+            }
+        }
+
+        // fields for storing metrics
+        private long startTime = 0;
+        private long lastCommitFinishTime = 0;
+        private long totalCommitRecords = 0;
+
+        private double totalReadDuration = 0;
+        private double totalWriteDuration = 0;
+
+        private long commitCount = 0;
+
+        private void flushBuffer() throws IOException {
+            totalCommitRecords += buffer.size();
+            commitCount++;
+
+            long startCommitTime = System.currentTimeMillis();
+
+            logger.info(String.format("(C-%d) Flushing buffer with %,d objects 
and %,d bytes", commitCount, buffer.size(),
+                    bufferCurrentSize));
+
+            double readingDuration = (startCommitTime - lastCommitFinishTime) 
/ 1000.;
+            totalReadDuration += readingDuration;
+            double currentReadRate = buffer.size() / readingDuration;
+            double totalReadRate = totalCommitRecords / totalReadDuration;
+
+            // Print "reading" metrics
+            logger.info(String.format("(C-%d) (Reading) Duration, Current 
Rate, Total Rate: %.2f %.2f %.2f ", commitCount, readingDuration,
+                    currentReadRate, totalReadRate));
+
+            // write to geo
+            geoIndexer.storeStatements(buffer);
+            geoIndexer.flush();
+
+            // write to free text
+            freeTextIndexer.storeStatements(buffer);
+            freeTextIndexer.flush();
+
+            // write to temporal
+            temporalIndexer.storeStatements(buffer);
+            temporalIndexer.flush();
+
+            // write to rya
+            try {
+                ryaIndexer.add(buffer.iterator());
+            } catch (RyaDAOException e) {
+                logger.error("Cannot writing statement to Rya", e);
+                throw new IOException(e);
+            }
+
+            lastCommitFinishTime = System.currentTimeMillis();
+
+            double writingDuration = (lastCommitFinishTime - startCommitTime) 
/ 1000.;
+            totalWriteDuration += writingDuration;
+            double currentWriteRate = buffer.size() / writingDuration;
+            double totalWriteRate = totalCommitRecords / totalWriteDuration;
+
+            // Print "writing" stats
+            logger.info(String.format("(C-%d) (Writing) Duration, Current 
Rate, Total Rate: %.2f %.2f %.2f ", commitCount, writingDuration,
+                    currentWriteRate, totalWriteRate));
+
+            double processDuration = writingDuration + readingDuration;
+            double totalProcessDuration = totalWriteDuration + 
totalReadDuration;
+            double currentProcessRate = buffer.size() / processDuration;
+            double totalProcessRate = totalCommitRecords / 
(totalProcessDuration);
+
+            // Print "total" stats
+            logger.info(String.format("(C-%d) (Total) Duration, Current Rate, 
Total Rate: %.2f %.2f %.2f ", commitCount, processDuration,
+                    currentProcessRate, totalProcessRate));
+
+            // clear the buffer
+            buffer.clear();
+            bufferCurrentSize = 0L;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java 
b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java
new file mode 100644
index 0000000..5c43687
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/StatementWritable.java
@@ -0,0 +1,85 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * #%L
+ * mvm.rya.indexing.accumulo
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import mvm.rya.indexing.accumulo.StatementSerializer;
+
+import org.apache.hadoop.io.Writable;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+
+/**
+ * A {@link Writable} wrapper for {@link Statement} objects.
+ */
+@SuppressWarnings("serial")
+public class StatementWritable implements Statement, Writable {
+
+    private Statement statement;
+
+    public StatementWritable(Statement statement) {
+        setStatement(statement);
+    }
+
+    public void setStatement(Statement statement) {
+        this.statement = statement;
+    }
+
+    public Statement getStatement() {
+        return statement;
+    }
+
+    @Override
+    public void readFields(DataInput paramDataInput) throws IOException {
+        statement = 
StatementSerializer.readStatement(paramDataInput.readUTF());
+    }
+
+    @Override
+    public void write(DataOutput paramDataOutput) throws IOException {
+        
paramDataOutput.writeUTF(StatementSerializer.writeStatement(statement));
+    }
+
+    @Override
+    public Resource getSubject() {
+        return statement.getSubject();
+    }
+
+    @Override
+    public URI getPredicate() {
+        return statement.getPredicate();
+    }
+
+    @Override
+    public Value getObject() {
+        return statement.getObject();
+    }
+
+    @Override
+    public Resource getContext() {
+        return statement.getContext();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92ddfa59/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java
 
b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java
new file mode 100644
index 0000000..1b469cd
--- /dev/null
+++ 
b/extras/indexing/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputToolIndexing.java
@@ -0,0 +1,226 @@
+package mvm.rya.accumulo.mr.fileinput;
+
+/*
+ * #%L
+ * mvm.rya.indexing.accumulo
+ * %%
+ * Copyright (C) 2014 Rya
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.mr.utils.MRUtils;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.indexing.FreeTextIndexer;
+import mvm.rya.indexing.GeoIndexer;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer;
+import mvm.rya.indexing.accumulo.geo.GeoMesaGeoIndexer;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+import org.geotools.feature.SchemaException;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ContextStatementImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.rio.ParserConfig;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.RDFParseException;
+import org.openrdf.rio.RDFParser;
+import org.openrdf.rio.Rio;
+import org.openrdf.rio.helpers.RDFHandlerBase;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Take large ntrips files and use MapReduce to ingest into other indexing
+ */
+public class BulkNtripsInputToolIndexing extends Configured implements Tool {
+
+    private String userName = null;
+    private String pwd = null;
+    private String instance = null;
+    private String zk = null;
+
+    private String format = RDFFormat.NTRIPLES.getName();
+
+    @Override
+    public int run(final String[] args) throws Exception {
+        final Configuration conf = getConf();
+        // conf
+        zk = conf.get(MRUtils.AC_ZK_PROP, zk);
+        instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
+        userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
+        pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
+        format = conf.get(MRUtils.FORMAT_PROP, format);
+
+        String auths = conf.get(MRUtils.AC_CV_PROP, "");
+
+        conf.set(MRUtils.FORMAT_PROP, format);
+        Preconditions.checkNotNull(zk, MRUtils.AC_ZK_PROP + " not set");
+        Preconditions.checkNotNull(instance, MRUtils.AC_INSTANCE_PROP + " not 
set");
+        Preconditions.checkNotNull(userName, MRUtils.AC_USERNAME_PROP + " not 
set");
+        Preconditions.checkNotNull(pwd, MRUtils.AC_PWD_PROP + " not set");
+
+        // map the config values to free text configu values
+        conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk);
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+        conf.set(ConfigUtils.CLOUDBASE_AUTHS, auths);
+
+        final String inputDir = args[0];
+
+        String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
+        Preconditions.checkNotNull(tablePrefix, MRUtils.TABLE_PREFIX_PROPERTY 
+ " not set");
+
+        String docTextTable = tablePrefix + "text";
+        conf.set(ConfigUtils.FREE_TEXT_DOC_TABLENAME, docTextTable);
+
+        String docTermTable = tablePrefix + "terms";
+        conf.set(ConfigUtils.FREE_TEXT_TERM_TABLENAME, docTermTable);
+
+        String geoTable = tablePrefix + "geo";
+        conf.set(ConfigUtils.GEO_TABLENAME, geoTable);
+
+        System.out.println("Loading data into tables[freetext, geo]");
+        System.out.println("Loading data into tables[" + docTermTable + " " + 
docTextTable + " " + geoTable + "]");
+
+        Job job = new Job(new Configuration(conf), "Bulk Ingest load data into 
Indexing Tables");
+        job.setJarByClass(this.getClass());
+
+        // setting long job
+        Configuration jobConf = job.getConfiguration();
+        jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
+        jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
+        jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", "256"));
+        jobConf.setBoolean("mapred.compress.map.output", true);
+
+        job.setInputFormatClass(TextInputFormat.class);
+
+        job.setMapperClass(ParseNtripsMapper.class);
+
+        // I'm not actually going to write output.
+        job.setOutputFormatClass(NullOutputFormat.class);
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+        job.setMapOutputKeyClass(Text.class);
+        job.setMapOutputValueClass(Text.class);
+
+        TextInputFormat.setInputPaths(job, new Path(inputDir));
+
+        job.setNumReduceTasks(0);
+
+        job.waitForCompletion(true);
+
+        return 0;
+    }
+
+    public static void main(String[] args) throws Exception {
+        ToolRunner.run(new Configuration(), new BulkNtripsInputToolIndexing(), 
args);
+    }
+
+    public static class ParseNtripsMapper extends Mapper<LongWritable, Text, 
Text, Text> {
+        private static final Logger logger = 
Logger.getLogger(ParseNtripsMapper.class);
+
+        public static final String TABLE_PROPERTY = "parsentripsmapper.table";
+
+        private RDFParser parser;
+        private FreeTextIndexer freeTextIndexer;
+        private GeoIndexer geoIndexer;
+        private String rdfFormat;
+
+        @Override
+        protected void setup(final Context context) throws IOException, 
InterruptedException {
+            super.setup(context);
+            Configuration conf = context.getConfiguration();
+
+            freeTextIndexer = new AccumuloFreeTextIndexer();
+            freeTextIndexer.setConf(conf);
+            geoIndexer = new GeoMesaGeoIndexer();
+            geoIndexer.setConf(conf);
+            final ValueFactory vf = new ValueFactoryImpl();
+
+            rdfFormat = conf.get(MRUtils.FORMAT_PROP);
+            checkNotNull(rdfFormat, "Rdf format cannot be null");
+
+            String namedGraphString = conf.get(MRUtils.NAMED_GRAPH_PROP);
+            checkNotNull(namedGraphString, MRUtils.NAMED_GRAPH_PROP + " cannot 
be null");
+
+            final Resource namedGraph = vf.createURI(namedGraphString);
+
+            parser = Rio.createParser(RDFFormat.valueOf(rdfFormat));
+            parser.setParserConfig(new ParserConfig(true, true, true, 
RDFParser.DatatypeHandling.VERIFY));
+            parser.setRDFHandler(new RDFHandlerBase() {
+
+                @Override
+                public void handleStatement(Statement statement) throws 
RDFHandlerException {
+                    Statement contextStatement = new 
ContextStatementImpl(statement.getSubject(), statement
+                            .getPredicate(), statement.getObject(), 
namedGraph);
+                    try {
+                        
freeTextIndexer.storeStatement(RdfToRyaConversions.convertStatement(contextStatement));
+                        
geoIndexer.storeStatement(RdfToRyaConversions.convertStatement(contextStatement));
+                    } catch (IOException e) {
+                        logger.error("Error creating indexers", e);
+                    }
+                }
+            });
+        }
+
+        @Override
+        public void map(LongWritable key, Text value, Context output) throws 
IOException, InterruptedException {
+            String rdf = value.toString();
+            try {
+                parser.parse(new StringReader(rdf), "");
+            } catch (RDFParseException e) {
+                System.out.println("Line[" + rdf + "] cannot be formatted with 
format[" + rdfFormat + "]. Exception[" + e.getMessage()
+                        + "]");
+            } catch (Exception e) {
+                logger.error("error during map", e);
+                throw new IOException("Exception occurred parsing triple[" + 
rdf + "]");
+            }
+        }
+
+        @Override
+        public void cleanup(Context context) {
+            IOUtils.closeStream(freeTextIndexer);
+            IOUtils.closeStream(geoIndexer);
+        }
+    }
+
+}


Reply via email to