Bugfixes and updates for Spark support

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3f27536a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3f27536a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3f27536a

Branch: refs/heads/master
Commit: 3f27536a38af4983f697ecd0540e9a82185b3262
Parents: 77ff31e
Author: Jesse Hatfield <[email protected]>
Authored: Thu Sep 8 16:40:55 2016 -0400
Committer: pujav65 <[email protected]>
Committed: Tue Dec 20 10:47:20 2016 -0500

----------------------------------------------------------------------
 .../rya/indexing/accumulo/ConfigUtils.java      |   1 -
 .../accumulo/entity/EntityCentricIndex.java     |  23 +-
 mapreduce/pom.xml                               |  20 +-
 .../rya/accumulo/mr/GraphXEdgeInputFormat.java  | 209 ------------------
 .../mvm/rya/accumulo/mr/GraphXInputFormat.java  | 132 ------------
 .../mvm/rya/accumulo/mr/RyaTypeWritable.java    |  74 -------
 .../rya/accumulo/mr/GraphXEdgeInputFormat.java  | 216 +++++++++++++++++++
 .../rya/accumulo/mr/GraphXInputFormat.java      | 147 +++++++++++++
 .../apache/rya/accumulo/mr/RyaTypeWritable.java | 123 +++++++++++
 .../accumulo/mr/GraphXEdgeInputFormatTest.java  | 134 ------------
 .../rya/accumulo/mr/GraphXInputFormatTest.java  | 144 -------------
 .../accumulo/mr/GraphXEdgeInputFormatTest.java  | 134 ++++++++++++
 .../rya/accumulo/mr/GraphXInputFormatTest.java  | 142 ++++++++++++
 pom.xml                                         |   1 +
 spark/pom.xml                                   |   6 +-
 .../accumulo/spark/GraphXGraphGenerator.java    | 188 ----------------
 .../accumulo/spark/GraphXGraphGenerator.java    | 183 ++++++++++++++++
 17 files changed, 978 insertions(+), 899 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
index e9e6c31..61a1003 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -365,7 +365,6 @@ public class ConfigUtils {
 
     public static void setIndexers(final RdfCloudTripleStoreConfiguration 
conf) {
 
-       System.out.println("Testuing");
         final List<String> indexList = Lists.newArrayList();
         final List<String> optimizers = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
index d58b1f1..0676e3d 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
@@ -24,6 +24,7 @@ import static 
org.apache.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE;
 import static org.apache.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES;
 import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES;
 import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT;
+import static org.apache.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTES;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -281,24 +282,26 @@ public class EntityCentricIndex extends 
AbstractAccumuloIndexer {
         final byte[] columnFamily = Arrays.copyOf(data, split);
         final byte[] edgeBytes = Arrays.copyOfRange(data, split + 
DELIM_BYTES.length, data.length);
         split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
-        final String otherNodeVar = new String(Arrays.copyOf(edgeBytes, 
split));
-        final byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes,  split + 
DELIM_BYTES.length, edgeBytes.length - 2);
-        final byte[] typeBytes = Arrays.copyOfRange(edgeBytes,  
edgeBytes.length - 2, edgeBytes.length);
+        String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
+        byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes,  split + 
DELIM_BYTES.length, edgeBytes.length);
+        split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES);
+        byte[] otherNodeData = Arrays.copyOf(otherNodeBytes,  split);
+        byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes,  split, 
otherNodeBytes.length);
         byte[] objectBytes;
         RyaURI subject;
         final RyaURI predicate = new RyaURI(new String(predicateBytes));
         RyaType object;
         RyaURI context = null;
-        // Expect either: entity=subject.data, otherNodeVar="object", 
otherNodeBytes={object.data, object.datatype_marker}
-        //            or: entity=object.data, otherNodeVar="subject", 
otherNodeBytes={subject.data, object.datatype_marker}
+        // Expect either: entity=subject.data, otherNodeVar="object", 
otherNodeBytes={object.data, object.datatype}
+        //            or: entity=object.data, otherNodeVar="subject", 
otherNodeBytes={subject.data, object.datatype}
         switch (otherNodeVar) {
             case "subject":
-                subject = new RyaURI(new String(otherNodeBytes));
+                subject = new RyaURI(new String(otherNodeData));
                 objectBytes = Bytes.concat(entityBytes, typeBytes);
                 break;
             case "object":
                 subject = new RyaURI(new String(entityBytes));
-                objectBytes = Bytes.concat(otherNodeBytes, typeBytes);
+                objectBytes = Bytes.concat(otherNodeData, typeBytes);
                 break;
             default:
                 throw new IOException("Failed to deserialize entity-centric 
index row. "
@@ -311,7 +314,7 @@ public class EntityCentricIndex extends 
AbstractAccumuloIndexer {
         return new RyaStatement(subject, predicate, object, context,
                 null, columnVisibility, valueBytes, timestamp);
     }
-    
+
     /**
      * Return the RyaType of the Entity Centric Index row.
      * @param key Row key, contains statement data
@@ -332,7 +335,9 @@ public class EntityCentricIndex extends 
AbstractAccumuloIndexer {
         byte[] edgeBytes = Arrays.copyOfRange(data, split + 
DELIM_BYTES.length, data.length);
         split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
         String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
-        byte[] typeBytes = Arrays.copyOfRange(edgeBytes,  edgeBytes.length - 
2, edgeBytes.length);
+        byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes,  split + 
DELIM_BYTES.length, edgeBytes.length);
+        split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES);
+        byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes,  split, 
otherNodeBytes.length);
         byte[] objectBytes;
         RyaURI subject;
         RyaType object;

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/mapreduce/pom.xml b/mapreduce/pom.xml
index d222845..c4f94f6 100644
--- a/mapreduce/pom.xml
+++ b/mapreduce/pom.xml
@@ -31,11 +31,11 @@ under the License.
     <name>Apache Rya MapReduce Tools</name>
 
     <dependencies>
-       <dependency>
-                   <groupId>org.apache.spark</groupId>
-                   <artifactId>spark-graphx_2.11</artifactId>
-                   <version>1.6.2</version>
-               </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-graphx_2.11</artifactId>
+            <version>1.6.2</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.api</artifactId>
@@ -116,6 +116,16 @@ under the License.
                         <executions>
                             <execution>
                                 <configuration>
+                                    <filters>
+                                        <filter>
+                                            <artifact>*:*</artifact>
+                                            <excludes>
+                                                
<exclude>META-INF/*.SF</exclude>
+                                                
<exclude>META-INF/*.DSA</exclude>
+                                                
<exclude>META-INF/*.RSA</exclude>
+                                            </excludes>
+                                        </filter>
+                                    </filters>
                                     <transformers>
                                         <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
                                     </transformers>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
deleted file mode 100644
index 79d6e82..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
+++ /dev/null
@@ -1,209 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.io.IOException;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.List;
-import java.util.Map.Entry;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.resolver.RyaTripleContext;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
-import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.spark.graphx.Edge;
-
-/**
- * Subclass of {@link AbstractInputFormat} for reading
- * {@link RyaStatementWritable}s directly from a running Rya instance.
- */
-@SuppressWarnings("rawtypes")
-public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
-       /**
-        * Instantiates a RecordReader for this InputFormat and a given task and
-        * input split.
-        * 
-        * @param split
-        *            Defines the portion of the input this RecordReader is
-        *            responsible for.
-        * @param context
-        *            The context of the task.
-        * @return A RecordReader that can be used to fetch 
RyaStatementWritables.
-        */
-       @Override
-       public RecordReader<Object, Edge> createRecordReader(InputSplit split,
-                       TaskAttemptContext context) {
-               return new RyaStatementRecordReader();
-       }
-
-       /**
-        * Sets the table layout to use.
-        * 
-        * @param conf
-        *            Configuration to set the layout in.
-        * @param layout
-        *            Statements will be read from the Rya table associated with
-        *            this layout.
-        */
-       public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
-               conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, 
layout.name());
-       }
-
-       /**
-        * Retrieves RyaStatementWritable objects from Accumulo tables.
-        */
-       public class RyaStatementRecordReader extends
-                       AbstractRecordReader<Object, Edge> {
-               private RyaTripleContext ryaContext;
-               private TABLE_LAYOUT tableLayout;
-
-               protected void setupIterators(TaskAttemptContext context,
-                               Scanner scanner, String tableName, 
RangeInputSplit split) {
-               }
-
-               /**
-                * Initializes the RecordReader.
-                * 
-                * @param inSplit
-                *            Defines the portion of data to read.
-                * @param attempt
-                *            Context for this task attempt.
-                * @throws IOException
-                *             if thrown by the superclass's initialize method.
-                */
-               @Override
-               public void initialize(InputSplit inSplit, TaskAttemptContext 
attempt)
-                               throws IOException {
-                       super.initialize(inSplit, attempt);
-                       this.tableLayout = MRUtils.getTableLayout(
-                                       attempt.getConfiguration(), 
TABLE_LAYOUT.SPO);
-                       // TODO verify that this is correct
-                       this.ryaContext = RyaTripleContext
-                                       .getInstance(new 
AccumuloRdfConfiguration(attempt
-                                                       .getConfiguration()));
-               }
-
-               /**
-                * Load the next statement by converting the next Accumulo row 
to a
-                * statement, and make the new (key,value) pair available for 
retrieval.
-                * 
-                * @return true if another (key,value) pair was fetched and is 
ready to
-                *         be retrieved, false if there was none.
-                * @throws IOException
-                *             if a row was loaded but could not be converted 
to a
-                *             statement.
-                */
-               @Override
-               public boolean nextKeyValue() throws IOException {
-                       if (!scannerIterator.hasNext())
-                               return false;
-                       Entry<Key, Value> entry = scannerIterator.next();
-                       ++numKeysRead;
-                       currentKey = entry.getKey();
-                       try {
-                               currentK = currentKey.getRow();
-                               RyaTypeWritable rtw = null;
-                               RyaStatement stmt = 
this.ryaContext.deserializeTriple(
-                                               this.tableLayout, new 
TripleRow(entry.getKey().getRow()
-                                                               .getBytes(), 
entry.getKey().getColumnFamily()
-                                                               .getBytes(), 
entry.getKey()
-                                                               
.getColumnQualifier().getBytes(), entry
-                                                               
.getKey().getTimestamp(), entry.getKey()
-                                                               
.getColumnVisibility().getBytes(), entry
-                                                               
.getValue().get()));
-
-                               String subjURI = 
stmt.getSubject().getDataType().toString();
-                               String objURI = 
stmt.getObject().getDataType().toString();
-
-                               // SHA-256 the string value and then generate a 
hashcode from
-                               // the digested string, the collision ratio is 
less than 0.0001%
-                               // using custom hash function should 
significantly reduce the
-                               // collision ratio
-                               MessageDigest messageDigest = MessageDigest
-                                               .getInstance("SHA-256");
-
-                               messageDigest.update(subjURI.getBytes());
-                               String encryptedString = new 
String(messageDigest.digest());
-                               long subHash = hash(encryptedString);
-
-                               messageDigest.update(objURI.getBytes());
-                               encryptedString = new 
String(messageDigest.digest());
-                               long objHash = hash(encryptedString);
-
-                               Edge<RyaTypeWritable> writable = new 
Edge<RyaTypeWritable>(
-                                               subHash, objHash, rtw);
-                               currentV = writable;
-                       } catch (TripleRowResolverException | 
NoSuchAlgorithmException e) {
-                               throw new IOException(e);
-                       }
-                       return true;
-               }
-
-               protected List<IteratorSetting> contextIterators(
-                               TaskAttemptContext context, String tableName) {
-                       return getIterators(context);
-               }
-
-               @Override
-               protected void setupIterators(TaskAttemptContext context,
-                               Scanner scanner, String tableName,
-                               
org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
-                       List<IteratorSetting> iterators = null;
-
-                       if (null == split) {
-                               iterators = contextIterators(context, 
tableName);
-                       } else {
-                               iterators = split.getIterators();
-                               if (null == iterators) {
-                                       iterators = contextIterators(context, 
tableName);
-                               }
-                       }
-
-                       for (IteratorSetting iterator : iterators)
-                               scanner.addScanIterator(iterator);
-               }
-
-       }
-
-       public static long hash(String string) {
-               long h = 1125899906842597L; // prime
-               int len = string.length();
-
-               for (int i = 0; i < len; i++) {
-                       h = 31 * h + string.charAt(i);
-               }
-               return h;
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
deleted file mode 100644
index 6ec5c74..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.SortedMap;
-
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.resolver.RyaTypeResolverException;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.IteratorSetting;
-import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Value;
-import org.apache.accumulo.core.iterators.user.WholeRowIterator;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-
-public class GraphXInputFormat extends InputFormatBase<Object, 
RyaTypeWritable> {
-
-       private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23;
-
-       /**
-        * Instantiates a RecordReader for this InputFormat and a given task and
-        * input split.
-        * 
-        * @param split
-        *            Defines the portion of the input this RecordReader is
-        *            responsible for.
-        * @param context
-        *            The context of the task.
-        * @return A RecordReader that can be used to fetch 
RyaStatementWritables.
-        */
-       @Override
-       public RecordReader<Object, RyaTypeWritable> createRecordReader(
-                       InputSplit split, TaskAttemptContext context) {
-               return new RyaStatementRecordReader();
-       }
-
-       
-       
-       /**
-        * Retrieves RyaStatementWritable objects from Accumulo tables.
-        */
-       public class RyaStatementRecordReader extends
-                       AbstractRecordReader<Object, RyaTypeWritable> {
-               protected void setupIterators(TaskAttemptContext context,
-                               Scanner scanner, String tableName,
-                               @SuppressWarnings("deprecation") 
RangeInputSplit split) {
-                       IteratorSetting iteratorSetting = new IteratorSetting(
-                                       WHOLE_ROW_ITERATOR_PRIORITY, 
WholeRowIterator.class);
-                       scanner.addScanIterator(iteratorSetting);
-               }
-
-               /**
-                * Initializes the RecordReader.
-                * 
-                * @param inSplit
-                *            Defines the portion of data to read.
-                * @param attempt
-                *            Context for this task attempt.
-                * @throws IOException
-                *             if thrown by the superclass's initialize method.
-                */
-               @Override
-               public void initialize(InputSplit inSplit, TaskAttemptContext 
attempt)
-                               throws IOException {
-                       super.initialize(inSplit, attempt);
-               }
-
-               /**
-                * Load the next statement by converting the next Accumulo row 
to a
-                * statement, and make the new (key,value) pair available for 
retrieval.
-                * 
-                * @return true if another (key,value) pair was fetched and is 
ready to
-                *         be retrieved, false if there was none.
-                * @throws IOException
-                *             if a row was loaded but could not be converted 
to a
-                *             statement.
-                */
-               @Override
-               public boolean nextKeyValue() throws IOException {
-                       if (!scannerIterator.hasNext())
-                               return false;
-                       Entry<Key, Value> entry = scannerIterator.next();
-                       ++numKeysRead;
-                       currentKey = entry.getKey();
-
-                       try {
-                               currentK = currentKey.getRow();
-                               SortedMap<Key, Value> wholeRow = 
WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
-                               Key key = wholeRow.firstKey();
-                               Value value = wholeRow.get(key);
-                               RyaType type = 
EntityCentricIndex.getRyaType(key, value);
-                               RyaTypeWritable writable = new 
RyaTypeWritable();
-                               writable.setRyaType(type);
-                               currentV = writable;
-                       } catch (RyaTypeResolverException e) {
-                               throw new IOException();
-                       }
-                       return true;
-               }
-
-               protected List<IteratorSetting> contextIterators(
-                               TaskAttemptContext context, String tableName) {
-                       return getIterators(context);
-               }
-
-               @Override
-               protected void setupIterators(TaskAttemptContext context,
-                               Scanner scanner, String tableName,
-                               
org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
-
-                       List<IteratorSetting> iterators = null;
-
-                       if (null == split) {
-                               iterators = contextIterators(context, 
tableName);
-                       } else {
-                               iterators = split.getIterators();
-                               if (null == iterators) {
-                                       iterators = contextIterators(context, 
tableName);
-                               }
-                       }
-
-                       for (IteratorSetting iterator : iterators)
-                               scanner.addScanIterator(iterator);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
deleted file mode 100644
index ddc0948..0000000
--- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package mvm.rya.accumulo.mr;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.resolver.triple.TripleRow;
-import mvm.rya.api.resolver.triple.TripleRowResolverException;
-
-import org.apache.hadoop.io.WritableComparable;
-import org.openrdf.model.URI;
-import org.openrdf.model.impl.ValueFactoryImpl;
-
-public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{
-
-       private RyaType ryatype;
-       
-       /**
-     * Read part of a statement from an input stream.
-     * @param dataInput Stream for reading serialized statements.
-     * @return The next individual field, as a byte array.
-     * @throws IOException if reading from the stream fails.
-     */
-    protected byte[] read(DataInput dataInput) throws IOException {
-        if (dataInput.readBoolean()) {
-            int len = dataInput.readInt();
-            byte[] bytes = new byte[len];
-            dataInput.readFully(bytes);
-            return bytes;
-        }else {
-            return null;
-        }
-    }
-       
-       @Override
-       public void readFields(DataInput dataInput) throws IOException {
-               ValueFactoryImpl vfi = new ValueFactoryImpl();
-               String data = dataInput.readLine();
-               String dataTypeString = dataInput.readLine();
-               URI dataType = vfi.createURI(dataTypeString);
-               ryatype.setData(data);
-               ryatype.setDataType(dataType);
-       }
-
-       @Override
-       public void write(DataOutput dataOutput) throws IOException {
-               dataOutput.writeChars(ryatype.getData());
-               dataOutput.writeChars(ryatype.getDataType().toString());
-       }
-       
-       /**
-     * Gets the contained RyaStatement.
-     * @return The statement represented by this RyaStatementWritable.
-     */
-    public RyaType getRyaType() {
-        return ryatype;
-    }
-    /**
-     * Sets the contained RyaStatement.
-     * @param   ryaStatement    The statement to be represented by this
-     *                          RyaStatementWritable.
-     */
-    public void setRyaType(RyaType ryatype) {
-        this.ryatype = ryatype;
-    }
-
-       @Override
-       public int compareTo(RyaTypeWritable o) {
-               return ryatype.compareTo(o.ryatype);
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java 
b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
new file mode 100644
index 0000000..489fd34
--- /dev/null
+++ 
b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java
@@ -0,0 +1,216 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RyaTripleContext;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.graphx.Edge;
+
+/**
+ * Subclass of {@link AbstractInputFormat} for reading
+ * {@link RyaStatementWritable}s directly from a running Rya instance.
+ */
+@SuppressWarnings("rawtypes")
+public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
+       /**
+        * Instantiates a RecordReader for this InputFormat and a given task and
+        * input split.
+        *
+        * @param split
+        *            Defines the portion of the input this RecordReader is
+        *            responsible for.
+        * @param context
+        *            The context of the task.
+        * @return A RecordReader that can be used to fetch 
RyaStatementWritables.
+        */
+       @Override
+       public RecordReader<Object, Edge> createRecordReader(InputSplit split,
+                       TaskAttemptContext context) {
+               return new RyaStatementRecordReader();
+       }
+
+       /**
+        * Sets the table layout to use.
+        *
+        * @param conf
+        *            Configuration to set the layout in.
+        * @param layout
+        *            Statements will be read from the Rya table associated with
+        *            this layout.
+        */
+       public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
+               conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, 
layout.name());
+       }
+
+       /**
+        * Retrieves RyaStatementWritable objects from Accumulo tables.
+        */
+       public class RyaStatementRecordReader extends
+                       AbstractRecordReader<Object, Edge> {
+               private RyaTripleContext ryaContext;
+               private TABLE_LAYOUT tableLayout;
+
+               protected void setupIterators(TaskAttemptContext context,
+                               Scanner scanner, String tableName, 
RangeInputSplit split) {
+               }
+
+               /**
+                * Initializes the RecordReader.
+                *
+                * @param inSplit
+                *            Defines the portion of data to read.
+                * @param attempt
+                *            Context for this task attempt.
+                * @throws IOException
+                *             if thrown by the superclass's initialize method.
+                */
+               @Override
+               public void initialize(InputSplit inSplit, TaskAttemptContext 
attempt)
+                               throws IOException {
+                       super.initialize(inSplit, attempt);
+                       this.tableLayout = MRUtils.getTableLayout(
+                                       attempt.getConfiguration(), 
TABLE_LAYOUT.SPO);
+                       // TODO verify that this is correct
+                       this.ryaContext = RyaTripleContext
+                                       .getInstance(new 
AccumuloRdfConfiguration(attempt
+                                                       .getConfiguration()));
+               }
+
+               /**
+                * Load the next statement by converting the next Accumulo row 
to a
+                * statement, and make the new (key,value) pair available for 
retrieval.
+                *
+                * @return true if another (key,value) pair was fetched and is 
ready to
+                *         be retrieved, false if there was none.
+                * @throws IOException
+                *             if a row was loaded but could not be converted 
to a
+                *             statement.
+                */
+               @Override
+               public boolean nextKeyValue() throws IOException {
+                       if (!scannerIterator.hasNext())
+                               return false;
+                       Entry<Key, Value> entry = scannerIterator.next();
+                       ++numKeysRead;
+                       currentKey = entry.getKey();
+                       try {
+                               currentK = currentKey.getRow();
+                               RyaTypeWritable rtw = new RyaTypeWritable();
+                               RyaStatement stmt = 
this.ryaContext.deserializeTriple(
+                                               this.tableLayout, new 
TripleRow(entry.getKey().getRow()
+                                                               .getBytes(), 
entry.getKey().getColumnFamily()
+                                                               .getBytes(), 
entry.getKey()
+                                                               
.getColumnQualifier().getBytes(), entry
+                                                               
.getKey().getTimestamp(), entry.getKey()
+                                                               
.getColumnVisibility().getBytes(), entry
+                                                               
.getValue().get()));
+
+                               long subHash = getVertexId(stmt.getSubject());
+                               long objHash = getVertexId(stmt.getObject());
+                               rtw.setRyaType(stmt.getPredicate());
+
+                               Edge<RyaTypeWritable> writable = new 
Edge<RyaTypeWritable>(
+                                               subHash, objHash, rtw);
+                               currentV = writable;
+                       } catch (TripleRowResolverException e) {
+                               throw new IOException(e);
+                       }
+                       return true;
+               }
+
+               protected List<IteratorSetting> contextIterators(
+                               TaskAttemptContext context, String tableName) {
+                       return getIterators(context);
+               }
+
+               @Override
+               protected void setupIterators(TaskAttemptContext context,
+                               Scanner scanner, String tableName,
+                               
org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+                       List<IteratorSetting> iterators = null;
+
+                       if (null == split) {
+                               iterators = contextIterators(context, 
tableName);
+                       } else {
+                               iterators = split.getIterators();
+                               if (null == iterators) {
+                                       iterators = contextIterators(context, 
tableName);
+                               }
+                       }
+
+                       for (IteratorSetting iterator : iterators)
+                               scanner.addScanIterator(iterator);
+               }
+
+       }
+
+       public static long getVertexId(RyaType resource) throws IOException {
+               String uri = "";
+               if (resource != null) {
+                       uri = resource.getData().toString();
+               }
+               try {
+                       // SHA-256 the string value and then generate a 
hashcode from
+                       // the digested string, the collision ratio is less 
than 0.0001%
+                       // using custom hash function should significantly 
reduce the
+                       // collision ratio
+                       MessageDigest messageDigest = MessageDigest
+                                       .getInstance("SHA-256");
+                       messageDigest.update(uri.getBytes());
+                       String encryptedString = new 
String(messageDigest.digest());
+                       return hash(encryptedString);
+               }
+               catch (NoSuchAlgorithmException e) {
+                       throw new IOException(e);
+               }
+       }
+
+       public static long hash(String string) {
+               long h = 1125899906842597L; // prime
+               int len = string.length();
+
+               for (int i = 0; i < len; i++) {
+                       h = 31 * h + string.charAt(i);
+               }
+               return h;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java 
b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
new file mode 100644
index 0000000..77f4e63
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java
@@ -0,0 +1,147 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RyaTypeResolverException;
+import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class GraphXInputFormat extends InputFormatBase<Object, 
RyaTypeWritable> {
+
+       private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23;
+
+       /**
+        * Instantiates a RecordReader for this InputFormat and a given task and
+        * input split.
+        *
+        * @param split
+        *            Defines the portion of the input this RecordReader is
+        *            responsible for.
+        * @param context
+        *            The context of the task.
+        * @return A RecordReader that can be used to fetch 
RyaStatementWritables.
+        */
+       @Override
+       public RecordReader<Object, RyaTypeWritable> createRecordReader(
+                       InputSplit split, TaskAttemptContext context) {
+               return new RyaStatementRecordReader();
+       }
+
+
+
+       /**
+        * Retrieves RyaStatementWritable objects from Accumulo tables.
+        */
+       public class RyaStatementRecordReader extends
+                       AbstractRecordReader<Object, RyaTypeWritable> {
+               protected void setupIterators(TaskAttemptContext context,
+                               Scanner scanner, String tableName,
+                               @SuppressWarnings("deprecation") 
RangeInputSplit split) {
+                       IteratorSetting iteratorSetting = new IteratorSetting(
+                                       WHOLE_ROW_ITERATOR_PRIORITY, 
WholeRowIterator.class);
+                       scanner.addScanIterator(iteratorSetting);
+               }
+
+               /**
+                * Initializes the RecordReader.
+                *
+                * @param inSplit
+                *            Defines the portion of data to read.
+                * @param attempt
+                *            Context for this task attempt.
+                * @throws IOException
+                *             if thrown by the superclass's initialize method.
+                */
+               @Override
+               public void initialize(InputSplit inSplit, TaskAttemptContext 
attempt)
+                               throws IOException {
+                       super.initialize(inSplit, attempt);
+               }
+
+               /**
+                * Load the next statement by converting the next Accumulo row 
to a
+                * statement, and make the new (key,value) pair available for 
retrieval.
+                *
+                * @return true if another (key,value) pair was fetched and is 
ready to
+                *         be retrieved, false if there was none.
+                * @throws IOException
+                *             if a row was loaded but could not be converted 
to a
+                *             statement.
+                */
+               @Override
+               public boolean nextKeyValue() throws IOException {
+                       if (!scannerIterator.hasNext())
+                               return false;
+                       Entry<Key, Value> entry = scannerIterator.next();
+                       ++numKeysRead;
+                       currentKey = entry.getKey();
+
+                       try {
+                               RyaType type = 
EntityCentricIndex.getRyaType(currentKey, entry.getValue());
+                               RyaTypeWritable writable = new 
RyaTypeWritable();
+                               writable.setRyaType(type);
+                               currentK = 
GraphXEdgeInputFormat.getVertexId(type);
+                               currentV = writable;
+                       } catch (RyaTypeResolverException e) {
+                               throw new IOException();
+                       }
+                       return true;
+               }
+
+               protected List<IteratorSetting> contextIterators(
+                               TaskAttemptContext context, String tableName) {
+                       return getIterators(context);
+               }
+
+               @Override
+               protected void setupIterators(TaskAttemptContext context,
+                               Scanner scanner, String tableName,
+                               
org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+
+                       List<IteratorSetting> iterators = null;
+
+                       if (null == split) {
+                               iterators = contextIterators(context, 
tableName);
+                       } else {
+                               iterators = split.getIterators();
+                               if (null == iterators) {
+                                       iterators = contextIterators(context, 
tableName);
+                               }
+                       }
+
+                       for (IteratorSetting iterator : iterators)
+                               scanner.addScanIterator(iterator);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java 
b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
new file mode 100644
index 0000000..ec47d82
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java
@@ -0,0 +1,123 @@
+package org.apache.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.rya.api.domain.RyaType;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{
+
+    private RyaType ryatype;
+
+    /**
+     * Read part of a statement from an input stream.
+     * @param dataInput Stream for reading serialized statements.
+     * @return The next individual field, as a byte array.
+     * @throws IOException if reading from the stream fails.
+     */
+    protected byte[] read(DataInput dataInput) throws IOException {
+        if (dataInput.readBoolean()) {
+            int len = dataInput.readInt();
+            byte[] bytes = new byte[len];
+            dataInput.readFully(bytes);
+            return bytes;
+        }else {
+            return null;
+        }
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+        ValueFactoryImpl vfi = new ValueFactoryImpl();
+        String data = dataInput.readLine();
+        String dataTypeString = dataInput.readLine();
+        URI dataType = vfi.createURI(dataTypeString);
+        ryatype.setData(data);
+        ryatype.setDataType(dataType);
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+        dataOutput.writeChars(ryatype.getData());
+        dataOutput.writeChars(ryatype.getDataType().toString());
+    }
+
+    /**
+     * Gets the contained RyaStatement.
+     * @return The statement represented by this RyaStatementWritable.
+     */
+    public RyaType getRyaType() {
+        return ryatype;
+    }
+    /**
+     * Sets the contained RyaStatement.
+     * @param   ryaStatement    The statement to be represented by this
+     *                          RyaStatementWritable.
+     */
+    public void setRyaType(RyaType ryatype) {
+        this.ryatype = ryatype;
+    }
+
+    @Override
+    public int compareTo(RyaTypeWritable o) {
+        return ryatype.compareTo(o.ryatype);
+    }
+
+    /**
+     * Tests for equality using the equals method of the enclosed RyaType.
+     * @param   o   Object to compare with
+     * @return  true if both objects are RyaTypeWritables containing equivalent
+     *          RyaTypes.
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (o == null || !(o instanceof RyaTypeWritable)) {
+            return false;
+        }
+        RyaType rtThis = ryatype;
+        RyaType rtOther = ((RyaTypeWritable) o).ryatype;
+        if (rtThis == null) {
+            return rtOther == null;
+        }
+        else {
+            return rtThis.equals(rtOther);
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        if (ryatype == null) {
+            return 0;
+        }
+        else {
+            return ryatype.hashCode();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java 
b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
deleted file mode 100644
index 445499d..0000000
--- a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.ArrayList;
-import java.util.List;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaURI;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.spark.graphx.Edge;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GraphXEdgeInputFormatTest {
-
-    static String username = "root", table = "rya_spo";
-    static PasswordToken password = new PasswordToken("");
-
-    static Instance instance;
-    static AccumuloRyaDAO apiImpl;
-
-    @Before
-    public void init() throws Exception {
-        instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() 
+ ".mock_instance");
-        Connector connector = instance.getConnector(username, password);
-        connector.tableOperations().create(table);
-
-        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix("rya_");
-        conf.setDisplayQueryPlan(false);
-
-        apiImpl = new AccumuloRyaDAO();
-        apiImpl.setConf(conf);
-        apiImpl.setConnector(connector);
-        apiImpl.init();
-    }
-
-    @After
-    public void after() throws Exception {
-        apiImpl.dropAndDestroy();
-    }
-
-    @SuppressWarnings("rawtypes")
-       @Test
-    public void testInputFormat() throws Exception {
-       RyaStatement input = RyaStatement.builder()
-            .setSubject(new RyaURI("http://www.google.com";))
-            .setPredicate(new RyaURI("http://some_other_uri";))
-            .setObject(new RyaURI("http://www.yahoo.com";))
-            .setColumnVisibility(new byte[0])
-            .setValue(new byte[0])
-            .build();
-
-        apiImpl.add(input);
-
-        Job jobConf = Job.getInstance();
-
-        GraphXEdgeInputFormat.setMockInstance(jobConf, 
instance.getInstanceName());
-        GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password);
-        GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
-        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
-        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
-
-        GraphXEdgeInputFormat.setScanIsolation(jobConf, false);
-        GraphXEdgeInputFormat.setLocalIterators(jobConf, false);
-        GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false);
-
-        GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat();
-
-        JobContext context = new JobContextImpl(jobConf.getConfiguration(), 
jobConf.getJobID());
-
-        List<InputSplit> splits = inputFormat.getSplits(context);
-
-        Assert.assertEquals(1, splits.size());
-
-        TaskAttemptContext taskAttemptContext = new 
TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new 
TaskID(), 1));
-
-        RecordReader reader = inputFormat.createRecordReader(splits.get(0), 
taskAttemptContext);
-
-        RecordReader ryaStatementRecordReader = (RecordReader) reader;
-        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
-
-        List<Edge> results = new ArrayList<Edge>();
-        while(ryaStatementRecordReader.nextKeyValue()) {
-            Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue();
-            long srcId = writable.srcId();
-            long destId = writable.dstId();
-                       RyaTypeWritable rtw = null;
-            Object text = ryaStatementRecordReader.getCurrentKey();
-            Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, 
destId, rtw);
-            results.add(edge);
-
-            System.out.println(text);
-        }
-
-        System.out.println(results.size());
-        System.out.println(results);
-        Assert.assertTrue(results.size() == 2);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java 
b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
deleted file mode 100644
index a31b27f..0000000
--- a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-package mvm.rya.accumulo.mr;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.util.ArrayList;
-import java.util.List;
-
-import mvm.rya.accumulo.AccumuloRdfConfiguration;
-import mvm.rya.accumulo.AccumuloRyaDAO;
-import mvm.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader;
-import mvm.rya.api.domain.RyaStatement;
-import mvm.rya.api.domain.RyaType;
-import mvm.rya.api.domain.RyaURI;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.mock.MockInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GraphXInputFormatTest {
-
-       private String username = "root", table = "rya_eci";
-    private PasswordToken password = new PasswordToken("");
-
-    private Instance instance;
-    private AccumuloRyaDAO apiImpl;
-
-    @Before
-    public void init() throws Exception {
-        instance = new MockInstance(GraphXInputFormatTest.class.getName() + 
".mock_instance");
-        Connector connector = instance.getConnector(username, password);
-        connector.tableOperations().create(table);
-
-        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
-        conf.setTablePrefix("rya_");
-        conf.setDisplayQueryPlan(false);
-        conf.setBoolean("sc.use_entity", true);
-
-        apiImpl = new AccumuloRyaDAO();
-        apiImpl.setConf(conf);
-        apiImpl.setConnector(connector);
-        apiImpl.init();
-    }
-
-    @After
-    public void after() throws Exception {
-        apiImpl.dropAndDestroy();
-    }
-
-    @Test
-    public void testInputFormat() throws Exception {
-       RyaStatement input = RyaStatement.builder()
-            .setSubject(new RyaURI("http://www.google.com";))
-            .setPredicate(new RyaURI("http://some_other_uri";))
-            .setObject(new RyaURI("http://www.yahoo.com";))
-            .setColumnVisibility(new byte[0])
-            .setValue(new byte[0])
-            .build();
-
-        apiImpl.add(input);
-
-        Job jobConf = Job.getInstance();
-
-        GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName());
-        GraphXInputFormat.setConnectorInfo(jobConf, username, password);
-        GraphXInputFormat.setInputTableName(jobConf, table);
-        GraphXInputFormat.setInputTableName(jobConf, table);
-
-        GraphXInputFormat.setScanIsolation(jobConf, false);
-        GraphXInputFormat.setLocalIterators(jobConf, false);
-        GraphXInputFormat.setOfflineTableScan(jobConf, false);
-
-        GraphXInputFormat inputFormat = new GraphXInputFormat();
-
-        JobContext context = new JobContextImpl(jobConf.getConfiguration(), 
jobConf.getJobID());
-
-        List<InputSplit> splits = inputFormat.getSplits(context);
-
-        Assert.assertEquals(1, splits.size());
-
-        TaskAttemptContext taskAttemptContext = new 
TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new 
TaskID(), 1));
-
-        RecordReader<Object, RyaTypeWritable> reader = 
inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
-
-        RyaStatementRecordReader ryaStatementRecordReader = 
(RyaStatementRecordReader)reader;
-        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
-
-        List<RyaType> results = new ArrayList<RyaType>();
-        System.out.println("before while");
-        while(ryaStatementRecordReader.nextKeyValue()) {
-               System.out.println("in while");
-            RyaTypeWritable writable = 
ryaStatementRecordReader.getCurrentValue();
-            RyaType value = writable.getRyaType();
-            Object text = ryaStatementRecordReader.getCurrentKey();
-            RyaType type = new RyaType();
-            type.setData(value.getData());
-            type.setDataType(value.getDataType());
-            results.add(type);
-            
-            System.out.println(value.getData());
-            System.out.println(value.getDataType());
-            System.out.println(results);
-            System.out.println(type);
-            System.out.println(text);
-            System.out.println(value);
-        }
-        System.out.println("after while");
-
-        System.out.println(results.size());
-        System.out.println(results);
-//        Assert.assertTrue(results.size() == 2);
-//        Assert.assertTrue(results.contains(input));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
 
b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
new file mode 100644
index 0000000..6686c8f
--- /dev/null
+++ 
b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
@@ -0,0 +1,134 @@
+package org.apache.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.graphx.Edge;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GraphXEdgeInputFormatTest {
+
+    static String username = "root", table = "rya_spo";
+    static PasswordToken password = new PasswordToken("");
+
+    static Instance instance;
+    static AccumuloRyaDAO apiImpl;
+
+    @Before
+    public void init() throws Exception {
+        instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() 
+ ".mock_instance");
+        Connector connector = instance.getConnector(username, password);
+        connector.tableOperations().create(table);
+
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix("rya_");
+        conf.setDisplayQueryPlan(false);
+
+        apiImpl = new AccumuloRyaDAO();
+        apiImpl.setConf(conf);
+        apiImpl.setConnector(connector);
+        apiImpl.init();
+    }
+
+    @After
+    public void after() throws Exception {
+        apiImpl.dropAndDestroy();
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void testInputFormat() throws Exception {
+        RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com";))
+            .setPredicate(new RyaURI("http://some_other_uri";))
+            .setObject(new RyaURI("http://www.yahoo.com";))
+            .setColumnVisibility(new byte[0])
+            .setValue(new byte[0])
+            .build();
+
+        apiImpl.add(input);
+
+        Job jobConf = Job.getInstance();
+
+        GraphXEdgeInputFormat.setMockInstance(jobConf, 
instance.getInstanceName());
+        GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password);
+        GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
+        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
+        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
+
+        GraphXEdgeInputFormat.setScanIsolation(jobConf, false);
+        GraphXEdgeInputFormat.setLocalIterators(jobConf, false);
+        GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false);
+
+        GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat();
+
+        JobContext context = new JobContextImpl(jobConf.getConfiguration(), 
jobConf.getJobID());
+
+        List<InputSplit> splits = inputFormat.getSplits(context);
+
+        Assert.assertEquals(1, splits.size());
+
+        TaskAttemptContext taskAttemptContext = new 
TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new 
TaskID(), 1));
+
+        RecordReader reader = inputFormat.createRecordReader(splits.get(0), 
taskAttemptContext);
+
+        RecordReader ryaStatementRecordReader = (RecordReader) reader;
+        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+        List<Edge> results = new ArrayList<Edge>();
+        while(ryaStatementRecordReader.nextKeyValue()) {
+            Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue();
+            long srcId = writable.srcId();
+            long destId = writable.dstId();
+            RyaTypeWritable rtw = null;
+            Object text = ryaStatementRecordReader.getCurrentKey();
+            Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, 
destId, rtw);
+            results.add(edge);
+
+            System.out.println(text);
+        }
+
+        System.out.println(results.size());
+        System.out.println(results);
+        Assert.assertTrue(results.size() == 2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java 
b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
new file mode 100644
index 0000000..b2a663c
--- /dev/null
+++ 
b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java
@@ -0,0 +1,142 @@
+package org.apache.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GraphXInputFormatTest {
+
+    private String username = "root", table = "rya_eci";
+    private PasswordToken password = new PasswordToken("");
+
+    private Instance instance;
+    private AccumuloRyaDAO apiImpl;
+
+    @Before
+    public void init() throws Exception {
+        instance = new MockInstance(GraphXInputFormatTest.class.getName() + 
".mock_instance");
+        Connector connector = instance.getConnector(username, password);
+        connector.tableOperations().create(table);
+
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix("rya_");
+        conf.setDisplayQueryPlan(false);
+        conf.setBoolean("sc.use_entity", true);
+
+        apiImpl = new AccumuloRyaDAO();
+        apiImpl.setConf(conf);
+        apiImpl.setConnector(connector);
+        apiImpl.init();
+    }
+
+    @After
+    public void after() throws Exception {
+        apiImpl.dropAndDestroy();
+    }
+
+    @Test
+    public void testInputFormat() throws Exception {
+        RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com";))
+            .setPredicate(new RyaURI("http://some_other_uri";))
+            .setObject(new RyaURI("http://www.yahoo.com";))
+            .setColumnVisibility(new byte[0])
+            .setValue(new byte[0])
+            .build();
+
+        apiImpl.add(input);
+
+        Job jobConf = Job.getInstance();
+
+        GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName());
+        GraphXInputFormat.setConnectorInfo(jobConf, username, password);
+        GraphXInputFormat.setInputTableName(jobConf, table);
+        GraphXInputFormat.setInputTableName(jobConf, table);
+
+        GraphXInputFormat.setScanIsolation(jobConf, false);
+        GraphXInputFormat.setLocalIterators(jobConf, false);
+        GraphXInputFormat.setOfflineTableScan(jobConf, false);
+
+        GraphXInputFormat inputFormat = new GraphXInputFormat();
+
+        JobContext context = new JobContextImpl(jobConf.getConfiguration(), 
jobConf.getJobID());
+
+        List<InputSplit> splits = inputFormat.getSplits(context);
+
+        Assert.assertEquals(1, splits.size());
+
+        TaskAttemptContext taskAttemptContext = new 
TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new 
TaskID(), 1));
+
+        RecordReader<Object, RyaTypeWritable> reader = 
inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
+
+        RyaStatementRecordReader ryaStatementRecordReader = 
(RyaStatementRecordReader)reader;
+        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+        List<RyaType> results = new ArrayList<RyaType>();
+        System.out.println("before while");
+        while(ryaStatementRecordReader.nextKeyValue()) {
+            System.out.println("in while");
+            RyaTypeWritable writable = 
ryaStatementRecordReader.getCurrentValue();
+            RyaType value = writable.getRyaType();
+            Object text = ryaStatementRecordReader.getCurrentKey();
+            RyaType type = new RyaType();
+            type.setData(value.getData());
+            type.setDataType(value.getDataType());
+            results.add(type);
+
+            System.out.println(value.getData());
+            System.out.println(value.getDataType());
+            System.out.println(results);
+            System.out.println(type);
+            System.out.println(text);
+            System.out.println(value);
+        }
+        System.out.println("after while");
+
+        System.out.println(results.size());
+        System.out.println(results);
+//        Assert.assertTrue(results.size() == 2);
+//        Assert.assertTrue(results.contains(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5ab29aa..43cf6f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -64,6 +64,7 @@ under the License.
         <module>osgi</module>
         <module>pig</module>
         <module>sail</module>
+        <module>spark</module>
         <module>web</module>
     </modules>
     <properties>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 02bdb37..04c8bb5 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -24,11 +24,11 @@ under the License.
     <parent>
         <groupId>org.apache.rya</groupId>
         <artifactId>rya-project</artifactId>
-        <version>3.2.10-SNAPSHOT</version>
+        <version>3.2.10-incubating-SNAPSHOT</version>
     </parent>
 
     <artifactId>rya.spark</artifactId>
-    <name>Apache Rya MapReduce Tools</name>
+    <name>Apache Rya Spark Support</name>
 
     <dependencies>
            <dependency>
@@ -39,7 +39,7 @@ under the License.
            <dependency>
                    <groupId>org.apache.spark</groupId>
                    <artifactId>spark-core_2.11</artifactId>
-                   <version>1.2.2</version>
+                   <version>1.6.2</version>
                </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java 
b/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
deleted file mode 100644
index f4b7860..0000000
--- a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
+++ /dev/null
@@ -1,188 +0,0 @@
-package mvm.rya.accumulo.spark;
-
-import java.io.IOException;
-
-import mvm.rya.accumulo.AccumuloRdfConstants;
-import mvm.rya.accumulo.mr.GraphXEdgeInputFormat;
-import mvm.rya.accumulo.mr.GraphXInputFormat;
-import mvm.rya.accumulo.mr.MRUtils;
-import mvm.rya.accumulo.mr.RyaInputFormat;
-import mvm.rya.accumulo.mr.RyaTypeWritable;
-import mvm.rya.api.RdfCloudTripleStoreConfiguration;
-import mvm.rya.api.RdfCloudTripleStoreConstants;
-import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import mvm.rya.indexing.accumulo.ConfigUtils;
-import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
-
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.ClientConfiguration;
-import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.graphx.Edge;
-import org.apache.spark.graphx.Graph;
-import org.apache.spark.rdd.RDD;
-import org.apache.spark.storage.StorageLevel;
-
-import scala.Tuple2;
-import scala.reflect.ClassTag;
-
-import com.google.common.base.Preconditions;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class GraphXGraphGenerator {
-       
-       public String zk;
-       public String instance;
-       public String userName;
-       public String pwd;
-       public boolean mock;
-       public String tablePrefix;
-       public Authorizations authorizations;
-       
-       public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext 
sc, Configuration conf) throws IOException, AccumuloSecurityException{
-               // Load configuration parameters
-        zk = MRUtils.getACZK(conf);
-        instance = MRUtils.getACInstance(conf);
-        userName = MRUtils.getACUserName(conf);
-        pwd = MRUtils.getACPwd(conf);
-        mock = MRUtils.getACMock(conf, false);
-        tablePrefix = MRUtils.getTablePrefix(conf);
-        // Set authorizations if specified
-        String authString = conf.get(MRUtils.AC_AUTH_PROP);
-               if (authString != null && !authString.isEmpty()) {
-            authorizations = new Authorizations(authString.split(","));
-            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for 
consistency
-        }
-        else {
-            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
-        }
-        // Set table prefix to the default if not set
-        if (tablePrefix == null) {
-            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-            MRUtils.setTablePrefix(conf, tablePrefix);
-        }
-        // Check for required configuration parameters
-        Preconditions.checkNotNull(instance, "Accumulo instance name [" + 
MRUtils.AC_INSTANCE_PROP + "] not set.");
-        Preconditions.checkNotNull(userName, "Accumulo username [" + 
MRUtils.AC_USERNAME_PROP + "] not set.");
-        Preconditions.checkNotNull(pwd, "Accumulo password [" + 
MRUtils.AC_PWD_PROP + "] not set.");
-        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + 
MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
-        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-        // If connecting to real accumulo, set additional parameters and 
require zookeepers
-        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for 
consistency
-        // Ensure consistency between alternative configuration properties
-        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
-        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
-        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
-        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
-        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
tablePrefix);
-
-               Job job = Job.getInstance(conf, sc.appName());
-               
-               ClientConfiguration clientConfig = new 
ClientConfiguration().with(ClientProperty.INSTANCE_NAME, 
instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
-               
-               //maybe ask conf for correct suffix?
-               GraphXInputFormat.setInputTableName(job, 
EntityCentricIndex.CONF_TABLE_SUFFIX);
-               GraphXInputFormat.setConnectorInfo(job, userName, pwd);
-               GraphXInputFormat.setZooKeeperInstance(job, clientConfig);
-               GraphXInputFormat.setScanAuthorizations(job, authorizations);
-               
-               return sc.newAPIHadoopRDD(job.getConfiguration(), 
GraphXInputFormat.class, Object.class, RyaTypeWritable.class);
-       }
-       
-       public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, 
Configuration conf) throws IOException, AccumuloSecurityException{
-               // Load configuration parameters
-        zk = MRUtils.getACZK(conf);
-        instance = MRUtils.getACInstance(conf);
-        userName = MRUtils.getACUserName(conf);
-        pwd = MRUtils.getACPwd(conf);
-        mock = MRUtils.getACMock(conf, false);
-        tablePrefix = MRUtils.getTablePrefix(conf);
-        // Set authorizations if specified
-        String authString = conf.get(MRUtils.AC_AUTH_PROP);
-               if (authString != null && !authString.isEmpty()) {
-            authorizations = new Authorizations(authString.split(","));
-            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for 
consistency
-        }
-        else {
-            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
-        }
-        // Set table prefix to the default if not set
-        if (tablePrefix == null) {
-            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
-            MRUtils.setTablePrefix(conf, tablePrefix);
-        }
-        // Check for required configuration parameters
-        Preconditions.checkNotNull(instance, "Accumulo instance name [" + 
MRUtils.AC_INSTANCE_PROP + "] not set.");
-        Preconditions.checkNotNull(userName, "Accumulo username [" + 
MRUtils.AC_USERNAME_PROP + "] not set.");
-        Preconditions.checkNotNull(pwd, "Accumulo password [" + 
MRUtils.AC_PWD_PROP + "] not set.");
-        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + 
MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
-        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
-        // If connecting to real accumulo, set additional parameters and 
require zookeepers
-        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for 
consistency
-        // Ensure consistency between alternative configuration properties
-        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
-        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
-        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
-        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
-        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
tablePrefix);
-
-               Job job = Job.getInstance(conf, sc.appName());
-               
-               ClientConfiguration clientConfig = new 
ClientConfiguration().with(ClientProperty.INSTANCE_NAME, 
instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
-               
-               RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO);
-               RyaInputFormat.setConnectorInfo(job, userName, pwd);
-               RyaInputFormat.setZooKeeperInstance(job, clientConfig);
-               RyaInputFormat.setScanAuthorizations(job, authorizations);
-               return sc.newAPIHadoopRDD(job.getConfiguration(), 
GraphXEdgeInputFormat.class, Object.class, Edge.class);
-       }
-       
-       public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext 
sc, Configuration conf) throws IOException, AccumuloSecurityException{
-               StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY();
-               StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY();
-               ClassTag<RyaTypeWritable> RTWTag = null;
-               RyaTypeWritable rtw = null;
-               RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = 
getVertexRDD(sc, conf);
-               
-               RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf);
-               JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD();
-               JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> 
tuple._2);
-               
-               RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd);
-               
-               return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, 
storageLvl2, RTWTag, RTWTag);
-       }
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-       
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java 
b/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
new file mode 100644
index 0000000..b1889b8
--- /dev/null
+++ 
b/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java
@@ -0,0 +1,183 @@
+package org.apache.rya.accumulo.spark;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+
+import org.apache.rya.accumulo.AccumuloRdfConstants;
+import org.apache.rya.accumulo.mr.GraphXEdgeInputFormat;
+import org.apache.rya.accumulo.mr.GraphXInputFormat;
+import org.apache.rya.accumulo.mr.MRUtils;
+import org.apache.rya.accumulo.mr.RyaInputFormat;
+import org.apache.rya.accumulo.mr.RyaTypeWritable;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConstants;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.RdfCloudTripleStoreUtils;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.graphx.Edge;
+import org.apache.spark.graphx.Graph;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.storage.StorageLevel;
+
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+import com.google.common.base.Preconditions;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class GraphXGraphGenerator {
+
+    public String zk;
+    public String instance;
+    public String userName;
+    public String pwd;
+    public boolean mock;
+    public String tablePrefix;
+    public Authorizations authorizations;
+
+    public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext sc, 
Configuration conf) throws IOException, AccumuloSecurityException{
+        // Load configuration parameters
+        zk = MRUtils.getACZK(conf);
+        instance = MRUtils.getACInstance(conf);
+        userName = MRUtils.getACUserName(conf);
+        pwd = MRUtils.getACPwd(conf);
+        mock = MRUtils.getACMock(conf, false);
+        tablePrefix = MRUtils.getTablePrefix(conf);
+        // Set authorizations if specified
+        String authString = conf.get(MRUtils.AC_AUTH_PROP);
+        if (authString != null && !authString.isEmpty()) {
+            authorizations = new Authorizations(authString.split(","));
+            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for 
consistency
+        }
+        else {
+            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        // Set table prefix to the default if not set
+        if (tablePrefix == null) {
+            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+            MRUtils.setTablePrefix(conf, tablePrefix);
+        }
+        // Check for required configuration parameters
+        Preconditions.checkNotNull(instance, "Accumulo instance name [" + 
MRUtils.AC_INSTANCE_PROP + "] not set.");
+        Preconditions.checkNotNull(userName, "Accumulo username [" + 
MRUtils.AC_USERNAME_PROP + "] not set.");
+        Preconditions.checkNotNull(pwd, "Accumulo password [" + 
MRUtils.AC_PWD_PROP + "] not set.");
+        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + 
MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        // If connecting to real accumulo, set additional parameters and 
require zookeepers
+        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for 
consistency
+        // Ensure consistency between alternative configuration properties
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
tablePrefix);
+
+        Job job = Job.getInstance(conf, sc.appName());
+
+        ClientConfiguration clientConfig = new 
ClientConfiguration().with(ClientProperty.INSTANCE_NAME, 
instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
+
+        GraphXInputFormat.setInputTableName(job, 
EntityCentricIndex.getTableName(conf));
+        GraphXInputFormat.setConnectorInfo(job, userName, new 
PasswordToken(pwd));
+        GraphXInputFormat.setZooKeeperInstance(job, clientConfig);
+        GraphXInputFormat.setScanAuthorizations(job, authorizations);
+
+        return sc.newAPIHadoopRDD(job.getConfiguration(), 
GraphXInputFormat.class, Object.class, RyaTypeWritable.class);
+    }
+
+    public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, Configuration 
conf) throws IOException, AccumuloSecurityException{
+        // Load configuration parameters
+        zk = MRUtils.getACZK(conf);
+        instance = MRUtils.getACInstance(conf);
+        userName = MRUtils.getACUserName(conf);
+        pwd = MRUtils.getACPwd(conf);
+        mock = MRUtils.getACMock(conf, false);
+        tablePrefix = MRUtils.getTablePrefix(conf);
+        // Set authorizations if specified
+        String authString = conf.get(MRUtils.AC_AUTH_PROP);
+        if (authString != null && !authString.isEmpty()) {
+            authorizations = new Authorizations(authString.split(","));
+            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for 
consistency
+        }
+        else {
+            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        // Set table prefix to the default if not set
+        if (tablePrefix == null) {
+            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+            MRUtils.setTablePrefix(conf, tablePrefix);
+        }
+        // Check for required configuration parameters
+        Preconditions.checkNotNull(instance, "Accumulo instance name [" + 
MRUtils.AC_INSTANCE_PROP + "] not set.");
+        Preconditions.checkNotNull(userName, "Accumulo username [" + 
MRUtils.AC_USERNAME_PROP + "] not set.");
+        Preconditions.checkNotNull(pwd, "Accumulo password [" + 
MRUtils.AC_PWD_PROP + "] not set.");
+        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + 
MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        // If connecting to real accumulo, set additional parameters and 
require zookeepers
+        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for 
consistency
+        // Ensure consistency between alternative configuration properties
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
tablePrefix);
+
+        Job job = Job.getInstance(conf, sc.appName());
+
+        ClientConfiguration clientConfig = new 
ClientConfiguration().with(ClientProperty.INSTANCE_NAME, 
instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
+
+        RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO);
+        RyaInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd));
+        RyaInputFormat.setZooKeeperInstance(job, clientConfig);
+        RyaInputFormat.setScanAuthorizations(job, authorizations);
+                String tableName = 
RdfCloudTripleStoreUtils.layoutPrefixToTable(TABLE_LAYOUT.SPO, tablePrefix);
+                InputFormatBase.setInputTableName(job, tableName);
+        return sc.newAPIHadoopRDD(job.getConfiguration(), 
GraphXEdgeInputFormat.class, Object.class, Edge.class);
+    }
+
+    public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext 
sc, Configuration conf) throws IOException, AccumuloSecurityException{
+        StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY();
+        StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY();
+        ClassTag<RyaTypeWritable> RTWTag = 
ClassTag$.MODULE$.apply(RyaTypeWritable.class);
+        RyaTypeWritable rtw = null;
+        RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = getVertexRDD(sc, 
conf);
+
+        RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf);
+        JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD();
+        JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> tuple._2);
+
+        RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd);
+
+        return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, storageLvl2, 
RTWTag, RTWTag);
+    }
+}

Reply via email to