Consolidated MapReduce API and applications into toplevel project.

Changes include:
- Made RdfFileInputFormat threaded, allowing it to handle all RDF input
- Added entity-centric indexing to RyaOutputFormat
- Added methods for using Rya input and output formats to 
AbstractAccumuloMRTool (renamed setupInputFormat to setupAccumuloInput since 
there are now multiple input options)
- Removed *NullIndexer classes, which were only used in RyaOutputFormat
- Removed StatementWritable, changed to RyaStatementWritable where applicable 
(for consistency)
- Fixed hashCode/compareTo/equals methods in RyaStatementWritable, RyaType, and 
RyaURI
- Minor renaming and repackaging (standalone tools go in "tools", "fileinput" 
and "utils" removed)
- Minor tweaks to AccumuloHDFSFileInputFormat, which seemed to be broken with 
Accumulo 1.6
- Documented code and added a MapReduce page to the manual
- Added "examples" package with one example
- Removed RyaStatementMapper and RyaStatementReducer: these simply insert 
records into Rya.
  This functionality can be achieved with RyaOutputFormat and the default 
mapper/reducer,
  so these two classes seem redundant.
- Removed BulkNtripsInputTool, BulkNtripsInputToolIndexing, 
RdfFileInputByLineTool,
  and RyaBatchWriterInputTool: Fixes to RdfFileInputFormat now allow 
RdfFileInputTool
  to now handle all the file input use cases (configurable secondary indexers, 
handles
  any format, scales), rendering the other file import tools redundant. 
(Previously, all
  five tools had largely overlapping but subtly different behavior.)


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/42895eac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/42895eac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/42895eac

Branch: refs/heads/develop
Commit: 42895eac0be3eabb836bd1cf0130cdd5951835c2
Parents: c698b56
Author: Jesse Hatfield <[email protected]>
Authored: Thu Jun 30 15:50:08 2016 -0400
Committer: pujav65 <[email protected]>
Committed: Thu Jul 21 08:45:17 2016 -0400

----------------------------------------------------------------------
 .../main/java/mvm/rya/api/domain/RyaType.java   |  38 +-
 .../java/mvm/rya/api/domain/RyaTypeTest.java    | 119 ++++
 dao/accumulo.rya/pom.xml                        |  69 ---
 .../rya/accumulo/mr/AbstractAccumuloMRTool.java | 164 -----
 .../accumulo/mr/RyaStatementInputFormat.java    |  94 ---
 .../mvm/rya/accumulo/mr/RyaStatementMapper.java |  85 ---
 .../rya/accumulo/mr/RyaStatementReducer.java    |  87 ---
 .../rya/accumulo/mr/RyaStatementWritable.java   | 150 -----
 .../accumulo/mr/eval/AccumuloRdfCountTool.java  | 258 --------
 .../mr/fileinput/BulkNtripsInputTool.java       | 369 ------------
 .../mr/fileinput/RdfFileInputByLineTool.java    | 251 --------
 .../mr/fileinput/RdfFileInputFormat.java        | 146 -----
 .../accumulo/mr/fileinput/RdfFileInputTool.java | 175 ------
 .../rya/accumulo/mr/upgrade/Upgrade322Tool.java | 240 --------
 .../mr/utils/AccumuloHDFSFileInputFormat.java   | 206 -------
 .../rya/accumulo/mr/utils/AccumuloProps.java    |  58 --
 .../java/mvm/rya/accumulo/mr/utils/MRUtils.java | 119 ----
 .../mvm/rya/accumulo/mr/RyaInputFormatTest.java | 225 -------
 .../mr/eval/AccumuloRdfCountToolTest.java       | 282 ---------
 .../mr/fileinput/RdfFileInputToolTest.java      | 146 -----
 .../accumulo/mr/upgrade/Upgrade322ToolTest.java | 319 ----------
 .../upgrade/UpgradeObjectSerializationTest.java | 119 ----
 .../src/test/resources/namedgraphs.trig         |   7 -
 .../src/test/resources/test.ntriples            |   1 -
 .../rya/accumulo/mr/NullFreeTextIndexer.java    | 102 ----
 .../mvm/rya/accumulo/mr/NullGeoIndexer.java     | 153 -----
 .../rya/accumulo/mr/NullTemporalIndexer.java    | 186 ------
 .../mvm/rya/accumulo/mr/RyaOutputFormat.java    | 329 ----------
 .../mvm/rya/accumulo/mr/StatementWritable.java  |  86 ---
 .../fileinput/BulkNtripsInputToolIndexing.java  | 227 -------
 .../mr/fileinput/RyaBatchWriterInputTool.java   | 243 --------
 extras/rya.manual/src/site/markdown/_index.md   |   1 +
 extras/rya.manual/src/site/markdown/index.md    |   1 +
 extras/rya.manual/src/site/markdown/loaddata.md |  15 +-
 .../rya.manual/src/site/markdown/mapreduce.md   | 107 ++++
 .../src/site/markdown/sm-firststeps.md          |   6 +-
 extras/rya.manual/src/site/site.xml             |   1 +
 extras/rya.reasoning/pom.xml                    |   4 +
 .../rya/reasoning/mr/AbstractReasoningTool.java |   4 +-
 .../mvm/rya/reasoning/mr/ConformanceTest.java   |   2 +-
 .../mvm/rya/reasoning/mr/MRReasoningUtils.java  |   2 +-
 extras/tinkerpop.rya/pom.xml                    |   4 +
 .../config/RyaGraphConfiguration.groovy         |   2 +-
 .../mvm/rya/blueprints/TstGremlinRya.groovy     |   2 +-
 .../config/RyaGraphConfigurationTest.groovy     |   2 +-
 .../sail/RyaSailVertexSequenceTest.groovy       |   2 +-
 mapreduce/pom.xml                               | 125 ++++
 .../rya/accumulo/mr/AbstractAccumuloMRTool.java | 305 ++++++++++
 .../mr/AccumuloHDFSFileInputFormat.java         | 161 +++++
 .../main/java/mvm/rya/accumulo/mr/MRUtils.java  | 317 ++++++++++
 .../mvm/rya/accumulo/mr/RdfFileInputFormat.java | 443 ++++++++++++++
 .../mvm/rya/accumulo/mr/RyaInputFormat.java     | 130 ++++
 .../mvm/rya/accumulo/mr/RyaOutputFormat.java    | 597 +++++++++++++++++++
 .../rya/accumulo/mr/RyaStatementWritable.java   | 256 ++++++++
 .../accumulo/mr/examples/TextOutputExample.java | 196 ++++++
 .../accumulo/mr/tools/AccumuloRdfCountTool.java | 258 ++++++++
 .../rya/accumulo/mr/tools/RdfFileInputTool.java |  91 +++
 .../rya/accumulo/mr/tools/Upgrade322Tool.java   | 241 ++++++++
 .../rya/accumulo/mr/RdfFileInputFormatTest.java | 180 ++++++
 .../mvm/rya/accumulo/mr/RyaInputFormatTest.java | 156 +++++
 .../rya/accumulo/mr/RyaOutputFormatTest.java    | 324 ++++++++++
 .../accumulo/mr/RyaStatementWritableTest.java   | 146 +++++
 .../java/mvm/rya/accumulo/mr/TestUtils.java     | 113 ++++
 .../mr/tools/AccumuloRdfCountToolTest.java      | 283 +++++++++
 .../accumulo/mr/tools/RdfFileInputToolTest.java | 131 ++++
 .../accumulo/mr/tools/Upgrade322ToolTest.java   | 275 +++++++++
 .../tools/UpgradeObjectSerializationTest.java   | 119 ++++
 mapreduce/src/test/resources/namedgraphs.trig   |   7 +
 mapreduce/src/test/resources/test.ntriples      |   3 +
 pom.xml                                         |   6 +
 70 files changed, 5152 insertions(+), 4919 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java 
b/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java
index ab580d6..7a35253 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/domain/RyaType.java
@@ -80,19 +80,25 @@ public class RyaType implements Comparable {
         return sb.toString();
     }
 
+    /**
+     * Determine equality based on string representations of data and datatype.
+     * @param o The object to compare with
+     * @return true if the other object is also a RyaType and both data and 
datatype match.
+     */
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
+        if (o == null || !(o instanceof RyaType)) return false;
         RyaType ryaType = (RyaType) o;
-
         if (data != null ? !data.equals(ryaType.data) : ryaType.data != null) 
return false;
         if (dataType != null ? !dataType.equals(ryaType.dataType) : 
ryaType.dataType != null) return false;
-
         return true;
     }
 
+    /**
+     * Generate a hash based on the string representations of both data and 
datatype.
+     * @return A hash consistent with equals.
+     */
     @Override
     public int hashCode() {
         int result = dataType != null ? dataType.hashCode() : 0;
@@ -100,12 +106,30 @@ public class RyaType implements Comparable {
         return result;
     }
 
+    /**
+     * Define a natural ordering based on data and datatype.
+     * @param o The object to compare with
+     * @return 0 if both the data string and the datatype string 
representation match between the objects,
+     *          where matching is defined by string comparison or both being 
null;
+     *          Otherwise, an integer whose sign yields a consistent ordering.
+     */
     @Override
     public int compareTo(Object o) {
-        if (o != null && this.getClass().isInstance(o)) {
+        int result = -1;
+        if (o != null && o instanceof RyaType) {
+            result = 0;
             RyaType other = (RyaType) o;
-            return this.getData().compareTo(other.getData());
+            if (this.data != other.data) {
+                if (this.data == null) return 1;
+                if (other.data == null) return -1;
+                result = this.data.compareTo(other.data);
+            }
+            if (result == 0 && this.dataType != other.dataType) {
+                if (this.dataType == null) return 1;
+                if (other.dataType == null) return -1;
+                result = 
this.dataType.toString().compareTo(other.dataType.toString());
+            }
         }
-        return -1;
+        return result;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/common/rya.api/src/test/java/mvm/rya/api/domain/RyaTypeTest.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/test/java/mvm/rya/api/domain/RyaTypeTest.java 
b/common/rya.api/src/test/java/mvm/rya/api/domain/RyaTypeTest.java
new file mode 100644
index 0000000..6db6053
--- /dev/null
+++ b/common/rya.api/src/test/java/mvm/rya/api/domain/RyaTypeTest.java
@@ -0,0 +1,119 @@
+package mvm.rya.api.domain;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+public class RyaTypeTest {
+    static RyaType a = new RyaType(XMLSchema.STRING, 
"http://www.example.com/Alice";);
+    static RyaType b = new RyaType(XMLSchema.STRING, 
"http://www.example.com/Bob";);
+    static RyaType c = new RyaType(XMLSchema.STRING, 
"http://www.example.com/Carol";);
+    static RyaType aUri = new RyaType(XMLSchema.ANYURI, 
"http://www.example.com/Alice";);
+    static RyaType bUri = new RyaType(XMLSchema.ANYURI, 
"http://www.example.com/Bob";);
+    RyaType nullData = new RyaType(XMLSchema.STRING, null);
+    RyaType nullType = new RyaType(null, "http://www.example.com/Alice";);
+    RyaType nullBoth = new RyaType(null, null);
+    RyaType same = new RyaType(XMLSchema.STRING, 
"http://www.example.com/Alice";);
+
+    @Test
+    public void testCompareTo() throws Exception {
+        Assert.assertEquals("compareTo(self) should return zero.", 0, 
aUri.compareTo(aUri));
+        Assert.assertFalse("compareTo should return nonzero for different data 
and type.", aUri.compareTo(b) == 0);
+        Assert.assertFalse("compareTo should return nonzero for same data and 
different datatypes.", a.compareTo(aUri) == 0);
+        Assert.assertFalse("compareTo should return nonzero for same datatype 
and different data.", bUri.compareTo(aUri) == 0);
+        Assert.assertEquals("compareTo should return zero for different 
objects with matching data and datatype.",
+                0, a.compareTo(same));
+    }
+
+    @Test
+    public void testCompareToNullFields() throws Exception {
+        Assert.assertEquals("[has no nulls].compareTo([has null data]) should 
return -1", -1, a.compareTo(nullData));
+        Assert.assertEquals("[has no nulls].compareTo([has null type]) should 
return -1 if data is equal",
+                -1, a.compareTo(nullType));
+        Assert.assertEquals("[has null data].compareTo([has no nulls]) should 
return 1", 1, nullData.compareTo(a));
+        Assert.assertEquals("[has null type].compareTo([has no nulls]) should 
return 1 if data is equal",
+                 1, nullType.compareTo(a));
+        Assert.assertEquals("[has null type].compareTo([has null data]) should 
return -1", -1, nullType.compareTo(nullData));
+    }
+
+    @Test
+    public void testCompareToSymmetry() throws Exception {
+        int forward = Integer.signum(a.compareTo(b));
+        int backward = Integer.signum(b.compareTo(a));
+        Assert.assertEquals("Comparison of different values with same type 
should yield opposite signs.", forward, backward * -1);
+        forward = Integer.signum(bUri.compareTo(b));
+        backward = Integer.signum(b.compareTo(bUri));
+        Assert.assertEquals("Comparison of same values with different types 
should yield opposite signs.", forward, backward*-1);
+        forward = Integer.signum(aUri.compareTo(b));
+        backward = Integer.signum(b.compareTo(aUri));
+        Assert.assertEquals("Comparison of different values with different 
types should yield opposite signs.",
+                forward, backward * -1);
+    }
+
+    @Test
+    public void testCompareToTransitive() throws Exception {
+        int sign = Integer.signum(a.compareTo(b));
+        Assert.assertEquals("compareTo(a,b) and compareTo(b,c) should have the 
same sign.",
+                sign, Integer.signum(b.compareTo(c)));
+        Assert.assertEquals("if a > b > c, compareTo(a,c) should be 
consistent.", sign, Integer.signum(a.compareTo(c)));
+    }
+
+    @Test
+    public void testEquals() throws Exception {
+        Assert.assertTrue("Same data and datatype should be equal.", 
a.equals(same));
+        Assert.assertFalse("Same data, different datatype should be unequal.", 
a.equals(aUri));
+        Assert.assertFalse("Same datatype, different data should be unequal.", 
a.equals(b));
+    }
+
+    @Test
+    public void testEqualsNullFields() throws Exception {
+        Assert.assertFalse("equals(null) should return false.", 
a.equals(null));
+        Assert.assertFalse("Same data, one null datatype should be unequal.", 
a.equals(nullType));
+        Assert.assertFalse("Same datatype, one null data should be unequal.", 
a.equals(nullData));
+        RyaType sameNull = new RyaType(null, null);
+        Assert.assertTrue("Matching null fields should be equal.", 
sameNull.equals(nullBoth));
+    }
+
+    @Test
+    public void testEqualsCompareToConsistency() throws Exception {
+        Assert.assertEquals("equals and compareTo inconsistent for matching 
values and types.",
+                a.equals(same), a.compareTo(same) == 0);
+        Assert.assertEquals("equals and compareTo inconsistent for different 
values with same types.",
+                a.equals(b), a.compareTo(b) == 0);
+        Assert.assertEquals("equals and compareTo inconsistent for same values 
having different types.",
+                a.equals(aUri), a.compareTo(aUri) == 0);
+        Assert.assertEquals("equals and compareTo inconsistent for different 
values and different types.",
+                a.equals(bUri), a.compareTo(bUri) == 0);
+    }
+
+    @Test
+    public void testHashCodeEquals() throws Exception {
+        Assert.assertEquals("Same data and same type should yield same hash 
code.",
+                a.hashCode(), same.hashCode());
+        Assert.assertEquals("Same type and both null data should yield same 
hash code.",
+                nullData.hashCode(), new RyaType(XMLSchema.STRING, 
null).hashCode());
+        Assert.assertEquals("Same data and both null type should yield same 
hash code.",
+                nullType.hashCode(), new RyaType(null, 
"http://www.example.com/Alice";).hashCode());
+        Assert.assertEquals("Null type and null data should yield same hash 
code.",
+                nullBoth.hashCode(), new RyaType(null, null).hashCode());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/pom.xml
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/pom.xml b/dao/accumulo.rya/pom.xml
index c089cfe..295b755 100644
--- a/dao/accumulo.rya/pom.xml
+++ b/dao/accumulo.rya/pom.xml
@@ -36,84 +36,15 @@ under the License.
             <artifactId>rya.api</artifactId>
         </dependency>
         
-        <!-- Accumulo deps -->
         <dependency>
             <groupId>org.apache.accumulo</groupId>
             <artifactId>accumulo-core</artifactId>
         </dependency>
 
         <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-rio-ntriples</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-rio-nquads</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-queryalgebra-evaluation</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.openrdf.sesame</groupId>
-            <artifactId>sesame-rio-trig</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.mrunit</groupId>
-            <artifactId>mrunit</artifactId>
-            <classifier>hadoop2</classifier>
-            <version>1.1.0</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
-
-    <build>
-        <pluginManagement>
-            <plugins>
-                <plugin>
-                    <groupId>org.apache.rat</groupId>
-                    <artifactId>apache-rat-plugin</artifactId>
-                    <configuration>
-                        <excludes>
-                            <!-- RDF data Files -->
-                            <exclude>**/*.ntriples</exclude>
-                            <exclude>**/*.trig</exclude>
-                        </excludes>
-                    </configuration>
-                </plugin>
-            </plugins>
-        </pluginManagement>
-    </build>
-
-    <profiles>
-        <profile>
-            <id>mr</id>
-            <build>
-                <plugins>
-                    <plugin>
-                        <groupId>org.apache.maven.plugins</groupId>
-                        <artifactId>maven-shade-plugin</artifactId>
-                        <executions>
-                            <execution>
-                                <configuration>
-                                    <transformers>
-                                        <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
-                                    </transformers>
-                                </configuration>
-                            </execution>
-                        </executions>
-
-                    </plugin>
-                </plugins>
-            </build>
-        </profile>
-    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
deleted file mode 100644
index 000c08a..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/AbstractAccumuloMRTool.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.utils.AccumuloHDFSFileInputFormat;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreUtils;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.iterators.user.AgeOffFilter;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-
-/**
- */
-public abstract class AbstractAccumuloMRTool {
-
-    protected Configuration conf;
-    protected RdfCloudTripleStoreConstants.TABLE_LAYOUT rdfTableLayout = 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
-    protected String userName = "root";
-    protected String pwd = "root";
-    protected String instance = "instance";
-    protected String zk = "zoo";
-    protected Authorizations authorizations = 
AccumuloRdfConstants.ALL_AUTHORIZATIONS;
-    protected String ttl = null;
-    protected boolean mock = false;
-    protected boolean hdfsInput = false;
-    protected String tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-
-    protected void init() {
-        zk = conf.get(MRUtils.AC_ZK_PROP, zk);
-        ttl = conf.get(MRUtils.AC_TTL_PROP, ttl);
-        instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
-        userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
-        pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
-        mock = conf.getBoolean(MRUtils.AC_MOCK_PROP, mock);
-        hdfsInput = conf.getBoolean(MRUtils.AC_HDFS_INPUT_PROP, hdfsInput);
-        tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
-        if (tablePrefix != null)
-            RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-        rdfTableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
-                conf.get(MRUtils.TABLE_LAYOUT_PROP, 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
-        String auth = conf.get(MRUtils.AC_AUTH_PROP);
-        if (auth != null)
-            authorizations = new Authorizations(auth.split(","));
-
-        if (!mock) {
-            conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-            conf.setBoolean("mapred.reduce.tasks.speculative.execution", 
false);
-            conf.set("io.sort.mb", "256");
-        }
-
-        //set ttl
-        ttl = conf.get(MRUtils.AC_TTL_PROP);
-    }
-
-    protected void setupInputFormat(Job job) throws AccumuloSecurityException {
-        // set up accumulo input
-        if (!hdfsInput) {
-            job.setInputFormatClass(AccumuloInputFormat.class);
-        } else {
-            job.setInputFormatClass(AccumuloHDFSFileInputFormat.class);
-        }
-        AccumuloInputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
-        AccumuloInputFormat.setInputTableName(job, 
RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix));
-        AccumuloInputFormat.setScanAuthorizations(job, authorizations);
-        if (!mock) {
-            AccumuloInputFormat.setZooKeeperInstance(job, instance, zk);
-        } else {
-            AccumuloInputFormat.setMockInstance(job, instance);
-        }
-        if (ttl != null) {
-            IteratorSetting setting = new IteratorSetting(1, "fi", 
AgeOffFilter.class.getName());
-            AgeOffFilter.setTTL(setting, Long.valueOf(ttl));
-            AccumuloInputFormat.addIterator(job, setting);
-        }
-    }
-
-    protected void setupOutputFormat(Job job, String outputTable) throws 
AccumuloSecurityException {
-        AccumuloOutputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
-        AccumuloOutputFormat.setCreateTables(job, true);
-        AccumuloOutputFormat.setDefaultTableName(job, outputTable);
-        if (!mock) {
-            AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk);
-        } else {
-            AccumuloOutputFormat.setMockInstance(job, instance);
-        }
-        job.setOutputFormatClass(AccumuloOutputFormat.class);
-    }
-
-    public void setConf(Configuration configuration) {
-        this.conf = configuration;
-    }
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    public String getInstance() {
-        return instance;
-    }
-
-    public void setInstance(String instance) {
-        this.instance = instance;
-    }
-
-    public String getPwd() {
-        return pwd;
-    }
-
-    public void setPwd(String pwd) {
-        this.pwd = pwd;
-    }
-
-    public String getZk() {
-        return zk;
-    }
-
-    public void setZk(String zk) {
-        this.zk = zk;
-    }
-
-    public String getTtl() {
-        return ttl;
-    }
-
-    public void setTtl(String ttl) {
-        this.ttl = ttl;
-    }
-
-    public String getUserName() {
-        return userName;
-    }
-
-    public void setUserName(String userName) {
-        this.userName = userName;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java
deleted file mode 100644
index 3399166..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java
+++ /dev/null
@@ -1,94 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.io.IOException;
-import java.util.Map.Entry;
-
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
-import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-public class RyaStatementInputFormat extends AbstractInputFormat<Text, 
RyaStatementWritable> {
-    @Override
-    public RecordReader<Text, RyaStatementWritable> 
createRecordReader(InputSplit split, TaskAttemptContext context) throws 
IOException, InterruptedException {
-        return new RyaStatementRecordReader();
-    }
-
-
-    public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
-        conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name());
-    }
-
-    public class RyaStatementRecordReader extends AbstractRecordReader<Text, 
RyaStatementWritable> {
-
-        private RyaTripleContext ryaContext;
-        private TABLE_LAYOUT tableLayout;
-
-        @Override
-        protected void setupIterators(TaskAttemptContext context, Scanner 
scanner, String tableName, RangeInputSplit split) {
-
-        }
-
-        @Override
-        public void initialize(InputSplit inSplit, TaskAttemptContext attempt) 
throws IOException {
-            super.initialize(inSplit, attempt);
-            this.tableLayout = 
TABLE_LAYOUT.valueOf(attempt.getConfiguration().get(MRUtils.TABLE_LAYOUT_PROP, 
TABLE_LAYOUT.OSP.toString()));
-            //TODO verify that this is correct
-            this.ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(attempt.getConfiguration()));
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException {
-            if (!scannerIterator.hasNext())
-                return false;
-
-            Entry<Key, Value> entry = scannerIterator.next();
-            ++numKeysRead;
-            currentKey = entry.getKey();
-
-            try {
-                currentK = currentKey.getRow();
-                RyaStatement stmt = 
this.ryaContext.deserializeTriple(this.tableLayout, new 
TripleRow(entry.getKey().getRow().getBytes(), 
entry.getKey().getColumnFamily().getBytes(), 
entry.getKey().getColumnQualifier().getBytes(), entry.getKey().getTimestamp(), 
entry.getKey().getColumnVisibility().getBytes(), entry.getValue().get()));
-                RyaStatementWritable writable = new RyaStatementWritable();
-                writable.setRyaStatement(stmt);
-                currentV = writable;
-            } catch(TripleRowResolverException e) {
-                throw new IOException(e);
-            }
-            return true;
-        }
-
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java
deleted file mode 100644
index d90215b..0000000
--- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java
+++ /dev/null
@@ -1,85 +0,0 @@
-  
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-public class RyaStatementMapper extends Mapper<Text, RyaStatementWritable, 
Text, Mutation> {
-
-    private Text spoTable;
-    private Text poTable;
-    private Text ospTable;
-    private RyaTableMutationsFactory mutationsFactory;
-
-    @Override
-    protected void setup(Context context) throws IOException, 
InterruptedException {
-        super.setup(context);
-
-        String tablePrefix = 
context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF);
-        spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX);
-        poTable = new Text(tablePrefix + TBL_PO_SUFFIX);
-        ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX);
-
-        RyaTripleContext ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(context.getConfiguration()));
-        mutationsFactory = new RyaTableMutationsFactory(ryaContext);
-    }
-
-    @Override
-    protected void map(Text key, RyaStatementWritable value, Context context) 
throws IOException, InterruptedException {
-
-        Map<TABLE_LAYOUT, Collection<Mutation>> mutations = 
mutationsFactory.serialize(value.getRyaStatement());
-
-        for(TABLE_LAYOUT layout : mutations.keySet()) {
-
-            Text table = null;
-            switch (layout) {
-                case SPO:
-                    table = spoTable;
-                    break;
-                case OSP:
-                    table = ospTable;
-                    break;
-                case PO:
-                    table = poTable;
-                    break;
-            }
-
-            for(Mutation mutation : mutations.get(layout)) {
-                context.write(table, mutation);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java
deleted file mode 100644
index e353528..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.resolver.RyaTripleContext;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Reducer;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PO_SUFFIX;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX;
-
-public class RyaStatementReducer extends Reducer<WritableComparable, 
RyaStatementWritable, Text, Mutation> {
-
-    private Text spoTable;
-    private Text poTable;
-    private Text ospTable;
-    private RyaTableMutationsFactory mutationsFactory;
-
-    @Override
-    protected void setup(Context context) throws IOException, 
InterruptedException {
-        super.setup(context);
-
-        String tablePrefix = 
context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF);
-        spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX);
-        poTable = new Text(tablePrefix + TBL_PO_SUFFIX);
-        ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX);
-
-        RyaTripleContext ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(context.getConfiguration()));
-        mutationsFactory = new RyaTableMutationsFactory(ryaContext);
-    }
-
-    @Override
-    protected void reduce(WritableComparable key, 
Iterable<RyaStatementWritable> values, Context context) throws IOException, 
InterruptedException {
-
-        for(RyaStatementWritable value : values) {
-            Map<TABLE_LAYOUT, Collection<Mutation>> mutations = 
mutationsFactory.serialize(value.getRyaStatement());
-
-            for(TABLE_LAYOUT layout : mutations.keySet()) {
-
-                Text table = null;
-                switch (layout) {
-                    case SPO:
-                        table = spoTable;
-                        break;
-                    case OSP:
-                        table = ospTable;
-                        break;
-                    case PO:
-                        table = poTable;
-                        break;
-                }
-
-                for(Mutation mutation : mutations.get(layout)) {
-                    context.write(table, mutation);
-                }
-            }
-
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
deleted file mode 100644
index 87a9433..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java
+++ /dev/null
@@ -1,150 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.WritableComparable;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-/**
- * Date: 7/17/12
- * Time: 1:29 PM
- */
-public class RyaStatementWritable implements WritableComparable {
-
-    private RyaTripleContext ryaContext;
-    private RyaStatement ryaStatement;
-    
-    public RyaStatementWritable(Configuration conf) {
-        this();
-    }
-     
-    public RyaStatementWritable(RyaTripleContext ryaContext) {
-       this.ryaContext = ryaContext;
-    }
-    
-    public RyaStatementWritable() {
-        this.ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration());
-    }
-    
-    public RyaStatementWritable(RyaStatement ryaStatement, RyaTripleContext 
ryaContext) {
-       this(ryaContext);
-        this.ryaStatement = ryaStatement;
-    }
-
-    public RyaStatement getRyaStatement() {
-        return ryaStatement;
-    }
-
-    public void setRyaStatement(RyaStatement ryaStatement) {
-        this.ryaStatement = ryaStatement;
-    }
-
-    @Override
-    public int compareTo(Object o) {
-        if (o instanceof RyaStatementWritable) {
-            return (getRyaStatement().equals(((RyaStatementWritable) 
o).getRyaStatement())) ? (0) : (-1);
-        }
-        return -1;
-    }
-
-    @Override
-    public void write(DataOutput dataOutput) throws IOException {
-        if (ryaStatement == null) {
-            throw new IOException("Rya Statement is null");
-        }
-        try {
-            Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, TripleRow> map = 
ryaContext.serializeTriple(ryaStatement);
-            TripleRow tripleRow = 
map.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-            byte[] row = tripleRow.getRow();
-            byte[] columnFamily = tripleRow.getColumnFamily();
-            byte[] columnQualifier = tripleRow.getColumnQualifier();
-            write(dataOutput, row);
-            write(dataOutput, columnFamily);
-            write(dataOutput, columnQualifier);
-            write(dataOutput, ryaStatement.getColumnVisibility());
-            write(dataOutput, ryaStatement.getValue());
-            Long timestamp = ryaStatement.getTimestamp();
-            boolean b = timestamp != null;
-            dataOutput.writeBoolean(b);
-            if (b) {
-                dataOutput.writeLong(timestamp);
-            }
-        } catch (TripleRowResolverException e) {
-            throw new IOException(e);
-        }
-    }
-
-    protected void write(DataOutput dataOutput, byte[] row) throws IOException 
{
-        boolean b = row != null;
-        dataOutput.writeBoolean(b);
-        if (b) {
-            dataOutput.writeInt(row.length);
-            dataOutput.write(row);
-        }
-    }
-
-    protected byte[] read(DataInput dataInput) throws IOException {
-        if (dataInput.readBoolean()) {
-            int len = dataInput.readInt();
-            byte[] bytes = new byte[len];
-            dataInput.readFully(bytes);
-            return bytes;
-        }else {
-            return null;
-        }
-    }
-
-    @Override
-    public void readFields(DataInput dataInput) throws IOException {
-        byte[] row = read(dataInput);
-        byte[] columnFamily = read(dataInput);
-        byte[] columnQualifier = read(dataInput);
-        byte[] columnVisibility = read(dataInput);
-        byte[] value = read(dataInput);
-        boolean b = dataInput.readBoolean();
-        Long timestamp = null;
-        if (b) {
-            timestamp = dataInput.readLong();
-        }
-        try {
-            ryaStatement = 
ryaContext.deserializeTriple(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO,
-                    new TripleRow(row, columnFamily, columnQualifier));
-            ryaStatement.setColumnVisibility(columnVisibility);
-            ryaStatement.setValue(value);
-            ryaStatement.setTimestamp(timestamp);
-        } catch (TripleRowResolverException e) {
-            throw new IOException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
deleted file mode 100644
index ee1004d..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/eval/AccumuloRdfCountTool.java
+++ /dev/null
@@ -1,258 +0,0 @@
-package mvm.rya.accumulo.mr.eval;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.Date;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.AbstractAccumuloMRTool;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.security.ColumnVisibility;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.ValueFactory;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-
-/**
- * Count subject, predicate, object. Save in table
- * Class RdfCloudTripleStoreCountTool
- * Date: Apr 12, 2011
- * Time: 10:39:40 AM
- * @deprecated
- */
-public class AccumuloRdfCountTool extends AbstractAccumuloMRTool implements 
Tool {
-
-    public static void main(String[] args) {
-        try {
-
-            ToolRunner.run(new Configuration(), new AccumuloRdfCountTool(), 
args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    /**
-     * cloudbase props
-     */
-
-    @Override
-    public int run(String[] strings) throws Exception {
-        conf.set(MRUtils.JOB_NAME_PROP, "Gather Evaluation Statistics");
-
-        //initialize
-        init();
-
-        Job job = new Job(conf);
-        job.setJarByClass(AccumuloRdfCountTool.class);
-        setupInputFormat(job);
-
-        AccumuloInputFormat.setRanges(job, Lists.newArrayList(new Range(new 
Text(new byte[]{}), new Text(new byte[]{Byte.MAX_VALUE}))));
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(LongWritable.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Mutation.class);
-
-        // set mapper and reducer classes
-        job.setMapperClass(CountPiecesMapper.class);
-        job.setCombinerClass(CountPiecesCombiner.class);
-        job.setReducerClass(CountPiecesReducer.class);
-
-        String outputTable = tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX;
-        setupOutputFormat(job, outputTable);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return 0;
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    public static class CountPiecesMapper extends Mapper<Key, Value, Text, 
LongWritable> {
-
-        public static final byte[] EMPTY_BYTES = new byte[0];
-        private RdfCloudTripleStoreConstants.TABLE_LAYOUT tableLayout = 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP;
-
-        ValueFactoryImpl vf = new ValueFactoryImpl();
-
-        private Text keyOut = new Text();
-        private LongWritable valOut = new LongWritable(1);
-        private RyaTripleContext ryaContext;
-
-        @Override
-        protected void setup(Context context) throws IOException, 
InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            tableLayout = RdfCloudTripleStoreConstants.TABLE_LAYOUT.valueOf(
-                    conf.get(MRUtils.TABLE_LAYOUT_PROP, 
RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP.toString()));
-            ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
-        }
-
-        @Override
-        protected void map(Key key, Value value, Context context) throws 
IOException, InterruptedException {
-            try {
-                RyaStatement statement = 
ryaContext.deserializeTriple(tableLayout, new 
TripleRow(key.getRow().getBytes(), key.getColumnFamily().getBytes(), 
key.getColumnQualifier().getBytes()));
-                //count each piece subject, pred, object
-
-                String subj = statement.getSubject().getData();
-                String pred = statement.getPredicate().getData();
-//                byte[] objBytes = 
tripleFormat.getValueFormat().serialize(statement.getObject());
-                RyaURI scontext = statement.getContext();
-                boolean includesContext = scontext != null;
-                String scontext_str = (includesContext) ? scontext.getData() : 
null;
-
-                ByteArrayDataOutput output = ByteStreams.newDataOutput();
-                output.writeUTF(subj);
-                output.writeUTF(RdfCloudTripleStoreConstants.SUBJECT_CF);
-                output.writeBoolean(includesContext);
-                if (includesContext)
-                    output.writeUTF(scontext_str);
-                keyOut.set(output.toByteArray());
-                context.write(keyOut, valOut);
-
-                output = ByteStreams.newDataOutput();
-                output.writeUTF(pred);
-                output.writeUTF(RdfCloudTripleStoreConstants.PRED_CF);
-                output.writeBoolean(includesContext);
-                if (includesContext)
-                    output.writeUTF(scontext_str);
-                keyOut.set(output.toByteArray());
-                context.write(keyOut, valOut);
-            } catch (TripleRowResolverException e) {
-                throw new IOException(e);
-            }
-        }
-    }
-
-    public static class CountPiecesCombiner extends Reducer<Text, 
LongWritable, Text, LongWritable> {
-
-        private LongWritable valOut = new LongWritable();
-
-        // TODO: can still add up to be large I guess
-        // any count lower than this does not need to be saved
-        public static final int TOO_LOW = 2;
-
-        @Override
-        protected void reduce(Text key, Iterable<LongWritable> values, Context 
context) throws IOException, InterruptedException {
-            long count = 0;
-            for (LongWritable lw : values) {
-                count += lw.get();
-            }
-
-            if (count <= TOO_LOW)
-                return;
-
-            valOut.set(count);
-            context.write(key, valOut);
-        }
-
-    }
-
-    public static class CountPiecesReducer extends Reducer<Text, LongWritable, 
Text, Mutation> {
-
-        Text row = new Text();
-        Text cat_txt = new Text();
-        Value v_out = new Value();
-        ValueFactory vf = new ValueFactoryImpl();
-
-        // any count lower than this does not need to be saved
-        public static final int TOO_LOW = 10;
-        private String tablePrefix;
-        protected Text table;
-        private ColumnVisibility cv = AccumuloRdfConstants.EMPTY_CV;
-
-        @Override
-        protected void setup(Context context) throws IOException, 
InterruptedException {
-            super.setup(context);
-            tablePrefix = 
context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-            table = new Text(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX);
-            final String cv_s = 
context.getConfiguration().get(MRUtils.AC_CV_PROP);
-            if (cv_s != null)
-                cv = new ColumnVisibility(cv_s);
-        }
-
-        @Override
-        protected void reduce(Text key, Iterable<LongWritable> values, Context 
context) throws IOException, InterruptedException {
-            long count = 0;
-            for (LongWritable lw : values) {
-                count += lw.get();
-            }
-
-            if (count <= TOO_LOW)
-                return;
-
-            ByteArrayDataInput badi = ByteStreams.newDataInput(key.getBytes());
-            String v = badi.readUTF();
-            cat_txt.set(badi.readUTF());
-
-            Text columnQualifier = RdfCloudTripleStoreConstants.EMPTY_TEXT;
-            boolean includesContext = badi.readBoolean();
-            if (includesContext) {
-                columnQualifier = new Text(badi.readUTF());
-            }
-
-            row.set(v);
-            Mutation m = new Mutation(row);
-            v_out.set((count + "").getBytes());
-            m.put(cat_txt, columnQualifier, cv, v_out);
-            context.write(table, m);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
deleted file mode 100644
index c3ddcfd..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/BulkNtripsInputTool.java
+++ /dev/null
@@ -1,369 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static mvm.rya.accumulo.AccumuloRdfUtils.extractValue;
-import static mvm.rya.accumulo.AccumuloRdfUtils.from;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolver;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
-import 
org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner;
-import 
org.apache.accumulo.core.client.mapreduce.lib.partition.RangePartitioner;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.util.TextUtil;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.ParserConfig;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParseException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.nquads.NQuadsParser;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Take large ntrips files and use MapReduce and Cloudbase
- * Bulk ingest techniques to load into the table in our partition format.
- * <p/>
- * Input: NTrips file
- * Map:
- * - key : shard row - Text
- * - value : stmt in doc triple format - Text
- * Partitioner: RangePartitioner
- * Reduce:
- * - key : all the entries for each triple - Cloudbase Key
- * Class BulkNtripsInputTool
- * Date: Sep 13, 2011
- * Time: 10:00:17 AM
- */
-public class BulkNtripsInputTool extends Configured implements Tool {
-
-    public static final String WORKDIR_PROP = "bulk.n3.workdir";
-
-    private String userName = "root";
-    private String pwd = "root";
-    private String instance = "isntance";
-    private String zk = "zoo";
-    private String ttl = null;
-    private String workDirBase = "/temp/bulkcb/work";
-    private String format = RDFFormat.NQUADS.getName();
-
-    @Override
-    public int run(final String[] args) throws Exception {
-        final Configuration conf = getConf();
-        try {
-            //conf
-            zk = conf.get(MRUtils.AC_ZK_PROP, zk);
-            ttl = conf.get(MRUtils.AC_TTL_PROP, ttl);
-            instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
-            userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
-            pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
-            workDirBase = conf.get(WORKDIR_PROP, workDirBase);
-            format = conf.get(MRUtils.FORMAT_PROP, format);
-            conf.set(MRUtils.FORMAT_PROP, format);
-            final String inputDir = args[0];
-
-            ZooKeeperInstance zooKeeperInstance = new 
ZooKeeperInstance(instance, zk);
-            Connector connector = zooKeeperInstance.getConnector(userName, new 
PasswordToken(pwd));
-            TableOperations tableOperations = connector.tableOperations();
-            
-            if (conf.get(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS) != 
null ) {
-                throw new IllegalArgumentException("Cannot use Bulk N Trips 
tool with Additional Indexers");
-            }
-
-            String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-            if (tablePrefix != null)
-                RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-            String[] tables = {tablePrefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
-                    tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
-                    tablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX};
-            Collection<Job> jobs = new ArrayList<Job>();
-            for (final String tableName : tables) {
-                PrintStream out = null;
-                try {
-                    String workDir = workDirBase + "/" + tableName;
-                    System.out.println("Loading data into table[" + tableName 
+ "]");
-
-                    Job job = new Job(new Configuration(conf), "Bulk Ingest 
load data to Generic RDF Table[" + tableName + "]");
-                    job.setJarByClass(this.getClass());
-                    //setting long job
-                    Configuration jobConf = job.getConfiguration();
-                    
jobConf.setBoolean("mapred.map.tasks.speculative.execution", false);
-                    
jobConf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-                    jobConf.set("io.sort.mb", jobConf.get("io.sort.mb", 
"256"));
-                    jobConf.setBoolean("mapred.compress.map.output", true);
-//                    jobConf.set("mapred.map.output.compression.codec", 
"org.apache.hadoop.io.compress.GzipCodec"); //TODO: I would like LZO compression
-
-                    job.setInputFormatClass(TextInputFormat.class);
-
-                    job.setMapperClass(ParseNtripsMapper.class);
-                    job.setMapOutputKeyClass(Key.class);
-                    job.setMapOutputValueClass(Value.class);
-
-                    job.setCombinerClass(OutStmtMutationsReducer.class);
-                    job.setReducerClass(OutStmtMutationsReducer.class);
-                    job.setOutputFormatClass(AccumuloFileOutputFormat.class);
-                   // AccumuloFileOutputFormat.setZooKeeperInstance(jobConf, 
instance, zk);
-
-                    jobConf.set(ParseNtripsMapper.TABLE_PROPERTY, tableName);
-
-                    TextInputFormat.setInputPaths(job, new Path(inputDir));
-
-                    FileSystem fs = FileSystem.get(conf);
-                    Path workPath = new Path(workDir);
-                    if (fs.exists(workPath))
-                        fs.delete(workPath, true);
-
-                    //make failures dir
-                    Path failures = new Path(workDir, "failures");
-                    fs.delete(failures, true);
-                    fs.mkdirs(new Path(workDir, "failures"));
-
-                    AccumuloFileOutputFormat.setOutputPath(job, new 
Path(workDir + "/files"));
-
-                    out = new PrintStream(new 
BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))));
-
-                    if (!tableOperations.exists(tableName))
-                        tableOperations.create(tableName);
-                    Collection<Text> splits = 
tableOperations.getSplits(tableName, Integer.MAX_VALUE);
-                    for (Text split : splits)
-                        out.println(new 
String(Base64.encodeBase64(TextUtil.getBytes(split))));
-
-                    job.setNumReduceTasks(splits.size() + 1);
-                    out.close();
-
-                    job.setPartitionerClass(KeyRangePartitioner.class);
-                    RangePartitioner.setSplitFile(job, workDir + 
"/splits.txt");
-
-                    jobConf.set(WORKDIR_PROP, workDir);
-
-                    job.submit();
-                    jobs.add(job);
-
-                } catch (Exception re) {
-                    throw new RuntimeException(re);
-                } finally {
-                    if (out != null)
-                        out.close();
-                }
-            }
-
-            for (Job job : jobs) {
-                while (!job.isComplete()) {
-                    Thread.sleep(1000);
-                }
-            }
-
-            for(String tableName : tables) {
-                String workDir = workDirBase + "/" + tableName;
-                String filesDir = workDir + "/files";
-                String failuresDir = workDir + "/failures";
-                
-                FileSystem fs = FileSystem.get(conf);
-                
-                //make sure that the "accumulo" user can read/write/execute 
into these directories this path
-                fs.setPermission(new Path(filesDir), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-                fs.setPermission(new Path(failuresDir), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-                
-                tableOperations.importDirectory(
-                        tableName,
-                        filesDir,
-                        failuresDir,
-                        false);
-                
-            }
-
-        } catch (Exception e ){
-            throw new RuntimeException(e);
-        }
-
-        return 0;
-    }
-
-    public static void main(String[] args) throws Exception {
-       ToolRunner.run(new Configuration(), new BulkNtripsInputTool(), args);
-    }
-
-    /**
-     * input: ntrips format triple
-     * <p/>
-     * output: key: shard row from generator
-     * value: stmt in serialized format (document format)
-     */
-    public static class ParseNtripsMapper extends Mapper<LongWritable, Text, 
Key, Value> {
-        public static final String TABLE_PROPERTY = "parsentripsmapper.table";
-
-        private RDFParser parser;
-        private String rdfFormat;
-        private String namedGraph;
-        private RyaTripleContext ryaContext;
-        private TripleRowResolver rowResolver;
-
-        @Override
-        protected void setup(final Context context) throws IOException, 
InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            final String table = conf.get(TABLE_PROPERTY);
-            Preconditions.checkNotNull(table, "Set the " + TABLE_PROPERTY + " 
property on the map reduce job");
-            this.ryaContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
-            rowResolver = ryaContext.getTripleResolver();
-
-            final String cv_s = conf.get(MRUtils.AC_CV_PROP);
-            final byte[] cv = cv_s == null ? null : cv_s.getBytes();
-            rdfFormat = conf.get(MRUtils.FORMAT_PROP);
-            checkNotNull(rdfFormat, "Rdf format cannot be null");
-
-            namedGraph = conf.get(MRUtils.NAMED_GRAPH_PROP);
-
-            parser = new NQuadsParser();
-               parser.setParserConfig(new ParserConfig(true, true, true, 
RDFParser.DatatypeHandling.VERIFY));
-            parser.setRDFHandler(new RDFHandler() {
-
-                @Override
-                public void startRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void endRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleStatement(Statement statement) throws 
RDFHandlerException {
-                    try {
-                        RyaStatement rs = 
RdfToRyaConversions.convertStatement(statement);
-                        if(rs.getColumnVisibility() == null) {
-                            rs.setColumnVisibility(cv);
-                        }
-
-                       // Inject the specified context into the statement.
-                        if(namedGraph != null){
-                            rs.setContext(new RyaURI(namedGraph));
-                        } else if (statement.getContext() != null) {
-                            rs.setContext(new 
RyaURI(statement.getContext().toString()));
-                        } 
-
-                        
Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT,TripleRow> serialize = 
rowResolver.serialize(rs);
-
-                        if 
(table.contains(RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX)) {
-                            TripleRow tripleRow = 
serialize.get(TABLE_LAYOUT.SPO);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else if 
(table.contains(RdfCloudTripleStoreConstants.TBL_PO_SUFFIX)) {
-                            TripleRow tripleRow = 
serialize.get(TABLE_LAYOUT.PO);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else if 
(table.contains(RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX)) {
-                            TripleRow tripleRow = 
serialize.get(TABLE_LAYOUT.OSP);
-                            context.write(
-                                    from(tripleRow),
-                                    extractValue(tripleRow)
-                            );
-                        } else
-                            throw new IllegalArgumentException("Unrecognized 
table[" + table + "]");
-
-                    } catch (Exception e) {
-                        throw new RDFHandlerException(e);
-                    }
-                }
-
-                @Override
-                public void handleComment(String s) throws RDFHandlerException 
{
-
-                }
-            });
-        }
-
-        @Override
-        public void map(LongWritable key, Text value, Context output)
-                throws IOException, InterruptedException {
-            String rdf = value.toString();
-            try {
-                parser.parse(new StringReader(rdf), "");
-            } catch (RDFParseException e) {
-                System.out.println("Line[" + rdf + "] cannot be formatted with 
format[" + rdfFormat + "]. Exception[" + e.getMessage() + "]");
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new IOException("Exception occurred parsing triple[" + 
rdf + "]");
-            }
-        }
-    }
-
-    public static class OutStmtMutationsReducer extends Reducer<Key, Value, 
Key, Value> {
-
-        public void reduce(Key key, Iterable<Value> values, Context output)
-                throws IOException, InterruptedException {
-            output.write(key, AccumuloRdfConstants.EMPTY_VALUE);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
deleted file mode 100644
index 5a872a0..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputByLineTool.java
+++ /dev/null
@@ -1,251 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Map;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.RyaTableMutationsFactory;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.Rio;
-
-/**
- * Do bulk import of rdf files
- * Class RdfFileInputTool2
- * Date: May 16, 2011
- * Time: 3:12:16 PM
- */
-public class RdfFileInputByLineTool implements Tool {
-
-    private Configuration conf = new Configuration();
-
-    private String userName = "root";
-    private String pwd = "password";
-    private String instance = "instance";
-    private String zk = "zoo";
-    private String tablePrefix = null;
-    private RDFFormat format = RDFFormat.NTRIPLES;
-
-    public Configuration getConf() {
-        return conf;
-    }
-
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static void main(String[] args) {
-        try {
-            ToolRunner.run(new Configuration(), new RdfFileInputByLineTool(), 
args);
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
-    public long runJob(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException, AccumuloSecurityException {
-        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
-        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
-        conf.set("io.sort.mb", "256");
-        conf.setLong("mapred.task.timeout", 600000000);
-
-        zk = conf.get(MRUtils.AC_ZK_PROP, zk);
-        instance = conf.get(MRUtils.AC_INSTANCE_PROP, instance);
-        userName = conf.get(MRUtils.AC_USERNAME_PROP, userName);
-        pwd = conf.get(MRUtils.AC_PWD_PROP, pwd);
-        format = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.NTRIPLES.toString()));
-
-        String tablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, 
RdfCloudTripleStoreConstants.TBL_PRFX_DEF);
-
-        Job job = new Job(conf);
-        job.setJarByClass(RdfFileInputByLineTool.class);
-
-        // set up cloudbase input
-        job.setInputFormatClass(TextInputFormat.class);
-        FileInputFormat.addInputPath(job, new Path(args[0]));
-
-        // set input output of the particular job
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(Mutation.class);
-
-        job.setOutputFormatClass(AccumuloOutputFormat.class);
-        AccumuloOutputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd.getBytes()));
-        AccumuloOutputFormat.setCreateTables(job, true);
-        AccumuloOutputFormat.setDefaultTableName(job, tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-        AccumuloOutputFormat.setZooKeeperInstance(job, instance, zk);
-
-        // set mapper and reducer classes
-        job.setMapperClass(TextToMutationMapper.class);
-        job.setNumReduceTasks(0);
-
-        // Submit the job
-        Date startTime = new Date();
-        System.out.println("Job started: " + startTime);
-        int exitCode = job.waitForCompletion(true) ? 0 : 1;
-
-        if (exitCode == 0) {
-            Date end_time = new Date();
-            System.out.println("Job ended: " + end_time);
-            System.out.println("The job took "
-                    + (end_time.getTime() - startTime.getTime()) / 1000
-                    + " seconds.");
-            return job
-                    .getCounters()
-                    .findCounter("org.apache.hadoop.mapred.Task$Counter",
-                            "REDUCE_OUTPUT_RECORDS").getValue();
-        } else {
-            System.out.println("Job Failed!!!");
-        }
-
-        return -1;
-    }
-
-    @Override
-    public int run(String[] args) throws Exception {
-        return (int) runJob(args);
-    }
-
-    public static class TextToMutationMapper extends Mapper<LongWritable, 
Text, Text, Mutation> {
-        protected RDFParser parser;
-        private String prefix;
-        private RDFFormat rdfFormat;
-        protected Text spo_table;
-        private Text po_table;
-        private Text osp_table;
-        private byte[] cv = AccumuloRdfConstants.EMPTY_CV.getExpression();
-
-        public TextToMutationMapper() {
-        }
-
-        @Override
-        protected void setup(final Context context) throws IOException, 
InterruptedException {
-            super.setup(context);
-            Configuration conf = context.getConfiguration();
-            prefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY, null);
-            if (prefix != null) {
-                RdfCloudTripleStoreConstants.prefixTables(prefix);
-            }
-
-            spo_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX);
-            po_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_PO_SUFFIX);
-            osp_table = new Text(prefix + 
RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX);
-
-            final String cv_s = conf.get(MRUtils.AC_CV_PROP);
-            if (cv_s != null)
-                cv = cv_s.getBytes();
-
-            rdfFormat = RDFFormat.valueOf(conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.NTRIPLES.toString()));
-            parser = Rio.createParser(rdfFormat);
-            RyaTripleContext tripleContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
-            final RyaTableMutationsFactory mut = new 
RyaTableMutationsFactory(tripleContext);
-
-            parser.setRDFHandler(new RDFHandler() {
-
-                @Override
-                public void startRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void endRDF() throws RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
-
-                }
-
-                @Override
-                public void handleStatement(Statement statement) throws 
RDFHandlerException {
-                    try {
-                        RyaStatement ryaStatement = 
RdfToRyaConversions.convertStatement(statement);
-                        if(ryaStatement.getColumnVisibility() == null) {
-                            ryaStatement.setColumnVisibility(cv);
-                        }
-                        Map<RdfCloudTripleStoreConstants.TABLE_LAYOUT, 
Collection<Mutation>> mutationMap =
-                                mut.serialize(ryaStatement);
-                        Collection<Mutation> spo = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.SPO);
-                        Collection<Mutation> po = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.PO);
-                        Collection<Mutation> osp = 
mutationMap.get(RdfCloudTripleStoreConstants.TABLE_LAYOUT.OSP);
-
-                        for (Mutation m : spo) {
-                            context.write(spo_table, m);
-                        }
-                        for (Mutation m : po) {
-                            context.write(po_table, m);
-                        }
-                        for (Mutation m : osp) {
-                            context.write(osp_table, m);
-                        }
-                    } catch (Exception e) {
-                        throw new RDFHandlerException(e);
-                    }
-                }
-
-                @Override
-                public void handleComment(String s) throws RDFHandlerException 
{
-
-                }
-            });
-        }
-
-        @Override
-        protected void map(LongWritable key, Text value, final Context 
context) throws IOException, InterruptedException {
-            try {
-                parser.parse(new StringReader(value.toString()), "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
-
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/42895eac/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
 
b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
deleted file mode 100644
index 3d2fd78..0000000
--- 
a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java
+++ /dev/null
@@ -1,146 +0,0 @@
-package mvm.rya.accumulo.mr.fileinput;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-
-
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.openrdf.model.Statement;
-import org.openrdf.rio.RDFFormat;
-import org.openrdf.rio.RDFHandler;
-import org.openrdf.rio.RDFHandlerException;
-import org.openrdf.rio.RDFParser;
-import org.openrdf.rio.Rio;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.mr.RyaStatementWritable;
-import mvm.rya.accumulo.mr.utils.MRUtils;
-import mvm.rya.api.resolver.RdfToRyaConversions;
-import mvm.rya.api.resolver.RyaTripleContext;
-
-/**
- * Be able to input multiple rdf formatted files. Convert from rdf format to 
statements.
- * Class RdfFileInputFormat
- * Date: May 16, 2011
- * Time: 2:11:24 PM
- */
-public class RdfFileInputFormat extends FileInputFormat<LongWritable, 
RyaStatementWritable> {
-
-    @Override
-    public RecordReader<LongWritable, RyaStatementWritable> 
createRecordReader(InputSplit inputSplit,
-                                                                               
TaskAttemptContext taskAttemptContext)
-            throws IOException, InterruptedException {
-        return new RdfFileRecordReader();
-    }
-
-    private class RdfFileRecordReader extends RecordReader<LongWritable, 
RyaStatementWritable> implements RDFHandler {
-
-        boolean closed = false;
-        long count = 0;
-        BlockingQueue<RyaStatementWritable> queue = new 
LinkedBlockingQueue<RyaStatementWritable>();
-        int total = 0;
-               private RyaTripleContext tripleContext;
-        
-
-        @Override
-        public void initialize(InputSplit inputSplit, TaskAttemptContext 
taskAttemptContext) throws IOException, InterruptedException {
-            FileSplit fileSplit = (FileSplit) inputSplit;
-            Configuration conf = taskAttemptContext.getConfiguration();
-            String rdfForm_s = conf.get(MRUtils.FORMAT_PROP, 
RDFFormat.RDFXML.getName());
-            RDFFormat rdfFormat = RDFFormat.valueOf(rdfForm_s);
-            tripleContext = RyaTripleContext.getInstance(new 
AccumuloRdfConfiguration(conf));
-
-            Path file = fileSplit.getPath();
-            FileSystem fs = file.getFileSystem(conf);
-            FSDataInputStream fileIn = fs.open(fileSplit.getPath());
-
-            RDFParser rdfParser = Rio.createParser(rdfFormat);
-            rdfParser.setRDFHandler(this);
-            try {
-                rdfParser.parse(fileIn, "");
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-            fileIn.close();
-            total = queue.size();
-            //TODO: Make this threaded so that you don't hold too many 
statements before sending them
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException 
{
-            return queue.size() > 0;
-        }
-
-        @Override
-        public LongWritable getCurrentKey() throws IOException, 
InterruptedException {
-            return new LongWritable(count++);
-        }
-
-        @Override
-        public RyaStatementWritable getCurrentValue() throws IOException, 
InterruptedException {
-            return queue.poll();
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return ((float) (total - queue.size())) / ((float) total);
-        }
-
-        @Override
-        public void close() throws IOException {
-            closed = true;
-        }
-
-        @Override
-        public void startRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void endRDF() throws RDFHandlerException {
-        }
-
-        @Override
-        public void handleNamespace(String s, String s1) throws 
RDFHandlerException {
-        }
-
-        @Override
-        public void handleStatement(Statement statement) throws 
RDFHandlerException {
-            queue.add(new 
RyaStatementWritable(RdfToRyaConversions.convertStatement(statement), 
tripleContext));
-        }
-
-        @Override
-        public void handleComment(String s) throws RDFHandlerException {
-        }
-    }
-
-}


Reply via email to