Repository: incubator-rya
Updated Branches:
  refs/heads/master 7e25bdaa0 -> 3b9fb100c


initial spark support


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/77ff31e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/77ff31e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/77ff31e2

Branch: refs/heads/master
Commit: 77ff31e2beecd4fe12ea633e7e7bc52c39e3ef0d
Parents: 7e25bda
Author: Evan Good <[email protected]>
Authored: Thu Aug 18 16:15:45 2016 +0100
Committer: pujav65 <[email protected]>
Committed: Tue Dec 20 10:45:53 2016 -0500

----------------------------------------------------------------------
 .../rya/indexing/accumulo/ConfigUtils.java      |   1 +
 .../accumulo/entity/EntityCentricIndex.java     |  42 ++++
 mapreduce/pom.xml                               |   5 +
 .../rya/accumulo/mr/GraphXEdgeInputFormat.java  | 209 +++++++++++++++++++
 .../mvm/rya/accumulo/mr/GraphXInputFormat.java  | 132 ++++++++++++
 .../mvm/rya/accumulo/mr/RyaTypeWritable.java    |  74 +++++++
 .../accumulo/mr/GraphXEdgeInputFormatTest.java  | 134 ++++++++++++
 .../rya/accumulo/mr/GraphXInputFormatTest.java  | 144 +++++++++++++
 .../rya/accumulo/mr/RyaInputFormatTest.java     |  27 +--
 spark/pom.xml                                   | 139 ++++++++++++
 .../accumulo/spark/GraphXGraphGenerator.java    | 188 +++++++++++++++++
 11 files changed, 1077 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
index 61a1003..e9e6c31 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java
@@ -365,6 +365,7 @@ public class ConfigUtils {
 
     public static void setIndexers(final RdfCloudTripleStoreConfiguration 
conf) {
 
+       System.out.println("Testuing");
         final List<String> indexList = Lists.newArrayList();
         final List<String> optimizers = Lists.newArrayList();
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
----------------------------------------------------------------------
diff --git 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
index 2a2bde3..d58b1f1 100644
--- 
a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
+++ 
b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java
@@ -311,6 +311,48 @@ public class EntityCentricIndex extends 
AbstractAccumuloIndexer {
         return new RyaStatement(subject, predicate, object, context,
                 null, columnVisibility, valueBytes, timestamp);
     }
+    
+    /**
+     * Return the RyaType of the Entity Centric Index row.
+     * @param key Row key, contains statement data
+     * @param value Row value
+     * @return The statement represented by the row
+     * @throws IOException if edge direction can't be extracted as expected.
+     * @throws RyaTypeResolverException if a type error occurs deserializing 
the statement's object.
+     */
+    public static RyaType getRyaType(Key key, Value value) throws 
RyaTypeResolverException, IOException {
+        assert key != null;
+        assert value != null;
+        byte[] entityBytes = key.getRowData().toArray();
+        byte[] data = key.getColumnQualifierData().toArray();
+
+        // main entity is either the subject or object
+        // data contains: column family , var name of other node , data of 
other node + datatype of object
+        int split = Bytes.indexOf(data, DELIM_BYTES);
+        byte[] edgeBytes = Arrays.copyOfRange(data, split + 
DELIM_BYTES.length, data.length);
+        split = Bytes.indexOf(edgeBytes, DELIM_BYTES);
+        String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split));
+        byte[] typeBytes = Arrays.copyOfRange(edgeBytes,  edgeBytes.length - 
2, edgeBytes.length);
+        byte[] objectBytes;
+        RyaURI subject;
+        RyaType object;
+        RyaType type = null;
+        switch (otherNodeVar) {
+            case "subject":
+                objectBytes = Bytes.concat(entityBytes, typeBytes);
+                object = RyaContext.getInstance().deserialize(objectBytes); 
//return this
+                type = object;
+                break;
+            case "object":
+                subject = new RyaURI(new String(entityBytes));//return this
+                type = subject;
+                break;
+            default:
+                throw new IOException("Failed to deserialize entity-centric 
index row. "
+                        + "Expected 'subject' or 'object', encountered: '" + 
otherNodeVar + "'");
+        }
+        return type;
+    }
 
     @Override
     public void init() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/mapreduce/pom.xml b/mapreduce/pom.xml
index cfe09d7..d222845 100644
--- a/mapreduce/pom.xml
+++ b/mapreduce/pom.xml
@@ -31,6 +31,11 @@ under the License.
     <name>Apache Rya MapReduce Tools</name>
 
     <dependencies>
+       <dependency>
+                   <groupId>org.apache.spark</groupId>
+                   <artifactId>spark-graphx_2.11</artifactId>
+                   <version>1.6.2</version>
+               </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.api</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
new file mode 100644
index 0000000..79d6e82
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java
@@ -0,0 +1,209 @@
+package mvm.rya.accumulo.mr;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.List;
+import java.util.Map.Entry;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.graphx.Edge;
+
+/**
+ * Subclass of {@link AbstractInputFormat} for reading
+ * {@link RyaStatementWritable}s directly from a running Rya instance.
+ */
+@SuppressWarnings("rawtypes")
+public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> {
+       /**
+        * Instantiates a RecordReader for this InputFormat and a given task and
+        * input split.
+        * 
+        * @param split
+        *            Defines the portion of the input this RecordReader is
+        *            responsible for.
+        * @param context
+        *            The context of the task.
+        * @return A RecordReader that can be used to fetch 
RyaStatementWritables.
+        */
+       @Override
+       public RecordReader<Object, Edge> createRecordReader(InputSplit split,
+                       TaskAttemptContext context) {
+               return new RyaStatementRecordReader();
+       }
+
+       /**
+        * Sets the table layout to use.
+        * 
+        * @param conf
+        *            Configuration to set the layout in.
+        * @param layout
+        *            Statements will be read from the Rya table associated with
+        *            this layout.
+        */
+       public static void setTableLayout(Job conf, TABLE_LAYOUT layout) {
+               conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, 
layout.name());
+       }
+
+       /**
+        * Retrieves RyaStatementWritable objects from Accumulo tables.
+        */
+       public class RyaStatementRecordReader extends
+                       AbstractRecordReader<Object, Edge> {
+               private RyaTripleContext ryaContext;
+               private TABLE_LAYOUT tableLayout;
+
+               protected void setupIterators(TaskAttemptContext context,
+                               Scanner scanner, String tableName, 
RangeInputSplit split) {
+               }
+
+               /**
+                * Initializes the RecordReader.
+                * 
+                * @param inSplit
+                *            Defines the portion of data to read.
+                * @param attempt
+                *            Context for this task attempt.
+                * @throws IOException
+                *             if thrown by the superclass's initialize method.
+                */
+               @Override
+               public void initialize(InputSplit inSplit, TaskAttemptContext 
attempt)
+                               throws IOException {
+                       super.initialize(inSplit, attempt);
+                       this.tableLayout = MRUtils.getTableLayout(
+                                       attempt.getConfiguration(), 
TABLE_LAYOUT.SPO);
+                       // TODO verify that this is correct
+                       this.ryaContext = RyaTripleContext
+                                       .getInstance(new 
AccumuloRdfConfiguration(attempt
+                                                       .getConfiguration()));
+               }
+
+               /**
+                * Load the next statement by converting the next Accumulo row 
to a
+                * statement, and make the new (key,value) pair available for 
retrieval.
+                * 
+                * @return true if another (key,value) pair was fetched and is 
ready to
+                *         be retrieved, false if there was none.
+                * @throws IOException
+                *             if a row was loaded but could not be converted 
to a
+                *             statement.
+                */
+               @Override
+               public boolean nextKeyValue() throws IOException {
+                       if (!scannerIterator.hasNext())
+                               return false;
+                       Entry<Key, Value> entry = scannerIterator.next();
+                       ++numKeysRead;
+                       currentKey = entry.getKey();
+                       try {
+                               currentK = currentKey.getRow();
+                               RyaTypeWritable rtw = null;
+                               RyaStatement stmt = 
this.ryaContext.deserializeTriple(
+                                               this.tableLayout, new 
TripleRow(entry.getKey().getRow()
+                                                               .getBytes(), 
entry.getKey().getColumnFamily()
+                                                               .getBytes(), 
entry.getKey()
+                                                               
.getColumnQualifier().getBytes(), entry
+                                                               
.getKey().getTimestamp(), entry.getKey()
+                                                               
.getColumnVisibility().getBytes(), entry
+                                                               
.getValue().get()));
+
+                               String subjURI = 
stmt.getSubject().getDataType().toString();
+                               String objURI = 
stmt.getObject().getDataType().toString();
+
+                               // SHA-256 the string value and then generate a 
hashcode from
+                               // the digested string, the collision ratio is 
less than 0.0001%
+                               // using custom hash function should 
significantly reduce the
+                               // collision ratio
+                               MessageDigest messageDigest = MessageDigest
+                                               .getInstance("SHA-256");
+
+                               messageDigest.update(subjURI.getBytes());
+                               String encryptedString = new 
String(messageDigest.digest());
+                               long subHash = hash(encryptedString);
+
+                               messageDigest.update(objURI.getBytes());
+                               encryptedString = new 
String(messageDigest.digest());
+                               long objHash = hash(encryptedString);
+
+                               Edge<RyaTypeWritable> writable = new 
Edge<RyaTypeWritable>(
+                                               subHash, objHash, rtw);
+                               currentV = writable;
+                       } catch (TripleRowResolverException | 
NoSuchAlgorithmException e) {
+                               throw new IOException(e);
+                       }
+                       return true;
+               }
+
+               protected List<IteratorSetting> contextIterators(
+                               TaskAttemptContext context, String tableName) {
+                       return getIterators(context);
+               }
+
+               @Override
+               protected void setupIterators(TaskAttemptContext context,
+                               Scanner scanner, String tableName,
+                               
org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+                       List<IteratorSetting> iterators = null;
+
+                       if (null == split) {
+                               iterators = contextIterators(context, 
tableName);
+                       } else {
+                               iterators = split.getIterators();
+                               if (null == iterators) {
+                                       iterators = contextIterators(context, 
tableName);
+                               }
+                       }
+
+                       for (IteratorSetting iterator : iterators)
+                               scanner.addScanIterator(iterator);
+               }
+
+       }
+
+       public static long hash(String string) {
+               long h = 1125899906842597L; // prime
+               int len = string.length();
+
+               for (int i = 0; i < len; i++) {
+                       h = 31 * h + string.charAt(i);
+               }
+               return h;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
new file mode 100644
index 0000000..6ec5c74
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java
@@ -0,0 +1,132 @@
+package mvm.rya.accumulo.mr;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.resolver.RyaTypeResolverException;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.mapreduce.InputFormatBase;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.WholeRowIterator;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+public class GraphXInputFormat extends InputFormatBase<Object, 
RyaTypeWritable> {
+
+       private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23;
+
+       /**
+        * Instantiates a RecordReader for this InputFormat and a given task and
+        * input split.
+        * 
+        * @param split
+        *            Defines the portion of the input this RecordReader is
+        *            responsible for.
+        * @param context
+        *            The context of the task.
+        * @return A RecordReader that can be used to fetch 
RyaStatementWritables.
+        */
+       @Override
+       public RecordReader<Object, RyaTypeWritable> createRecordReader(
+                       InputSplit split, TaskAttemptContext context) {
+               return new RyaStatementRecordReader();
+       }
+
+       
+       
+       /**
+        * Retrieves RyaStatementWritable objects from Accumulo tables.
+        */
+       public class RyaStatementRecordReader extends
+                       AbstractRecordReader<Object, RyaTypeWritable> {
+               protected void setupIterators(TaskAttemptContext context,
+                               Scanner scanner, String tableName,
+                               @SuppressWarnings("deprecation") 
RangeInputSplit split) {
+                       IteratorSetting iteratorSetting = new IteratorSetting(
+                                       WHOLE_ROW_ITERATOR_PRIORITY, 
WholeRowIterator.class);
+                       scanner.addScanIterator(iteratorSetting);
+               }
+
+               /**
+                * Initializes the RecordReader.
+                * 
+                * @param inSplit
+                *            Defines the portion of data to read.
+                * @param attempt
+                *            Context for this task attempt.
+                * @throws IOException
+                *             if thrown by the superclass's initialize method.
+                */
+               @Override
+               public void initialize(InputSplit inSplit, TaskAttemptContext 
attempt)
+                               throws IOException {
+                       super.initialize(inSplit, attempt);
+               }
+
+               /**
+                * Load the next statement by converting the next Accumulo row 
to a
+                * statement, and make the new (key,value) pair available for 
retrieval.
+                * 
+                * @return true if another (key,value) pair was fetched and is 
ready to
+                *         be retrieved, false if there was none.
+                * @throws IOException
+                *             if a row was loaded but could not be converted 
to a
+                *             statement.
+                */
+               @Override
+               public boolean nextKeyValue() throws IOException {
+                       if (!scannerIterator.hasNext())
+                               return false;
+                       Entry<Key, Value> entry = scannerIterator.next();
+                       ++numKeysRead;
+                       currentKey = entry.getKey();
+
+                       try {
+                               currentK = currentKey.getRow();
+                               SortedMap<Key, Value> wholeRow = 
WholeRowIterator.decodeRow(entry.getKey(), entry.getValue());
+                               Key key = wholeRow.firstKey();
+                               Value value = wholeRow.get(key);
+                               RyaType type = 
EntityCentricIndex.getRyaType(key, value);
+                               RyaTypeWritable writable = new 
RyaTypeWritable();
+                               writable.setRyaType(type);
+                               currentV = writable;
+                       } catch (RyaTypeResolverException e) {
+                               throw new IOException();
+                       }
+                       return true;
+               }
+
+               protected List<IteratorSetting> contextIterators(
+                               TaskAttemptContext context, String tableName) {
+                       return getIterators(context);
+               }
+
+               @Override
+               protected void setupIterators(TaskAttemptContext context,
+                               Scanner scanner, String tableName,
+                               
org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) {
+
+                       List<IteratorSetting> iterators = null;
+
+                       if (null == split) {
+                               iterators = contextIterators(context, 
tableName);
+                       } else {
+                               iterators = split.getIterators();
+                               if (null == iterators) {
+                                       iterators = contextIterators(context, 
tableName);
+                               }
+                       }
+
+                       for (IteratorSetting iterator : iterators)
+                               scanner.addScanIterator(iterator);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java 
b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
new file mode 100644
index 0000000..ddc0948
--- /dev/null
+++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java
@@ -0,0 +1,74 @@
+package mvm.rya.accumulo.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.openrdf.model.URI;
+import org.openrdf.model.impl.ValueFactoryImpl;
+
+public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{
+
+       private RyaType ryatype;
+       
+       /**
+     * Read part of a statement from an input stream.
+     * @param dataInput Stream for reading serialized statements.
+     * @return The next individual field, as a byte array.
+     * @throws IOException if reading from the stream fails.
+     */
+    protected byte[] read(DataInput dataInput) throws IOException {
+        if (dataInput.readBoolean()) {
+            int len = dataInput.readInt();
+            byte[] bytes = new byte[len];
+            dataInput.readFully(bytes);
+            return bytes;
+        }else {
+            return null;
+        }
+    }
+       
+       @Override
+       public void readFields(DataInput dataInput) throws IOException {
+               ValueFactoryImpl vfi = new ValueFactoryImpl();
+               String data = dataInput.readLine();
+               String dataTypeString = dataInput.readLine();
+               URI dataType = vfi.createURI(dataTypeString);
+               ryatype.setData(data);
+               ryatype.setDataType(dataType);
+       }
+
+       @Override
+       public void write(DataOutput dataOutput) throws IOException {
+               dataOutput.writeChars(ryatype.getData());
+               dataOutput.writeChars(ryatype.getDataType().toString());
+       }
+       
+       /**
+     * Gets the contained RyaStatement.
+     * @return The statement represented by this RyaStatementWritable.
+     */
+    public RyaType getRyaType() {
+        return ryatype;
+    }
+    /**
+     * Sets the contained RyaStatement.
+     * @param   ryaStatement    The statement to be represented by this
+     *                          RyaStatementWritable.
+     */
+    public void setRyaType(RyaType ryatype) {
+        this.ryatype = ryatype;
+    }
+
+       @Override
+       public int compareTo(RyaTypeWritable o) {
+               return ryatype.compareTo(o.ryatype);
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java 
b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
new file mode 100644
index 0000000..445499d
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java
@@ -0,0 +1,134 @@
+package mvm.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.spark.graphx.Edge;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GraphXEdgeInputFormatTest {
+
+    static String username = "root", table = "rya_spo";
+    static PasswordToken password = new PasswordToken("");
+
+    static Instance instance;
+    static AccumuloRyaDAO apiImpl;
+
+    @Before
+    public void init() throws Exception {
+        instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() 
+ ".mock_instance");
+        Connector connector = instance.getConnector(username, password);
+        connector.tableOperations().create(table);
+
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix("rya_");
+        conf.setDisplayQueryPlan(false);
+
+        apiImpl = new AccumuloRyaDAO();
+        apiImpl.setConf(conf);
+        apiImpl.setConnector(connector);
+        apiImpl.init();
+    }
+
+    @After
+    public void after() throws Exception {
+        apiImpl.dropAndDestroy();
+    }
+
+    @SuppressWarnings("rawtypes")
+       @Test
+    public void testInputFormat() throws Exception {
+       RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com";))
+            .setPredicate(new RyaURI("http://some_other_uri";))
+            .setObject(new RyaURI("http://www.yahoo.com";))
+            .setColumnVisibility(new byte[0])
+            .setValue(new byte[0])
+            .build();
+
+        apiImpl.add(input);
+
+        Job jobConf = Job.getInstance();
+
+        GraphXEdgeInputFormat.setMockInstance(jobConf, 
instance.getInstanceName());
+        GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password);
+        GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO);
+        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
+        GraphXEdgeInputFormat.setInputTableName(jobConf, table);
+
+        GraphXEdgeInputFormat.setScanIsolation(jobConf, false);
+        GraphXEdgeInputFormat.setLocalIterators(jobConf, false);
+        GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false);
+
+        GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat();
+
+        JobContext context = new JobContextImpl(jobConf.getConfiguration(), 
jobConf.getJobID());
+
+        List<InputSplit> splits = inputFormat.getSplits(context);
+
+        Assert.assertEquals(1, splits.size());
+
+        TaskAttemptContext taskAttemptContext = new 
TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new 
TaskID(), 1));
+
+        RecordReader reader = inputFormat.createRecordReader(splits.get(0), 
taskAttemptContext);
+
+        RecordReader ryaStatementRecordReader = (RecordReader) reader;
+        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+        List<Edge> results = new ArrayList<Edge>();
+        while(ryaStatementRecordReader.nextKeyValue()) {
+            Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue();
+            long srcId = writable.srcId();
+            long destId = writable.dstId();
+                       RyaTypeWritable rtw = null;
+            Object text = ryaStatementRecordReader.getCurrentKey();
+            Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, 
destId, rtw);
+            results.add(edge);
+
+            System.out.println(text);
+        }
+
+        System.out.println(results.size());
+        System.out.println(results);
+        Assert.assertTrue(results.size() == 2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java 
b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
new file mode 100644
index 0000000..a31b27f
--- /dev/null
+++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java
@@ -0,0 +1,144 @@
+package mvm.rya.accumulo.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaType;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GraphXInputFormatTest {
+
+       private String username = "root", table = "rya_eci";
+    private PasswordToken password = new PasswordToken("");
+
+    private Instance instance;
+    private AccumuloRyaDAO apiImpl;
+
+    @Before
+    public void init() throws Exception {
+        instance = new MockInstance(GraphXInputFormatTest.class.getName() + 
".mock_instance");
+        Connector connector = instance.getConnector(username, password);
+        connector.tableOperations().create(table);
+
+        AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+        conf.setTablePrefix("rya_");
+        conf.setDisplayQueryPlan(false);
+        conf.setBoolean("sc.use_entity", true);
+
+        apiImpl = new AccumuloRyaDAO();
+        apiImpl.setConf(conf);
+        apiImpl.setConnector(connector);
+        apiImpl.init();
+    }
+
+    @After
+    public void after() throws Exception {
+        apiImpl.dropAndDestroy();
+    }
+
+    @Test
+    public void testInputFormat() throws Exception {
+       RyaStatement input = RyaStatement.builder()
+            .setSubject(new RyaURI("http://www.google.com";))
+            .setPredicate(new RyaURI("http://some_other_uri";))
+            .setObject(new RyaURI("http://www.yahoo.com";))
+            .setColumnVisibility(new byte[0])
+            .setValue(new byte[0])
+            .build();
+
+        apiImpl.add(input);
+
+        Job jobConf = Job.getInstance();
+
+        GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName());
+        GraphXInputFormat.setConnectorInfo(jobConf, username, password);
+        GraphXInputFormat.setInputTableName(jobConf, table);
+        GraphXInputFormat.setInputTableName(jobConf, table);
+
+        GraphXInputFormat.setScanIsolation(jobConf, false);
+        GraphXInputFormat.setLocalIterators(jobConf, false);
+        GraphXInputFormat.setOfflineTableScan(jobConf, false);
+
+        GraphXInputFormat inputFormat = new GraphXInputFormat();
+
+        JobContext context = new JobContextImpl(jobConf.getConfiguration(), 
jobConf.getJobID());
+
+        List<InputSplit> splits = inputFormat.getSplits(context);
+
+        Assert.assertEquals(1, splits.size());
+
+        TaskAttemptContext taskAttemptContext = new 
TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new 
TaskID(), 1));
+
+        RecordReader<Object, RyaTypeWritable> reader = 
inputFormat.createRecordReader(splits.get(0), taskAttemptContext);
+
+        RyaStatementRecordReader ryaStatementRecordReader = 
(RyaStatementRecordReader)reader;
+        ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext);
+
+        List<RyaType> results = new ArrayList<RyaType>();
+        System.out.println("before while");
+        while(ryaStatementRecordReader.nextKeyValue()) {
+               System.out.println("in while");
+            RyaTypeWritable writable = 
ryaStatementRecordReader.getCurrentValue();
+            RyaType value = writable.getRyaType();
+            Object text = ryaStatementRecordReader.getCurrentKey();
+            RyaType type = new RyaType();
+            type.setData(value.getData());
+            type.setDataType(value.getDataType());
+            results.add(type);
+            
+            System.out.println(value.getData());
+            System.out.println(value.getDataType());
+            System.out.println(results);
+            System.out.println(type);
+            System.out.println(text);
+            System.out.println(value);
+        }
+        System.out.println("after while");
+
+        System.out.println(results.size());
+        System.out.println(results);
+//        Assert.assertTrue(results.size() == 2);
+//        Assert.assertTrue(results.contains(input));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java 
b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java
index 1c75629..f571682 100644
--- a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java
+++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java
@@ -17,22 +17,15 @@ package org.apache.rya.accumulo.mr;
  * specific language governing permissions and limitations
  * under the License.
  */
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.AccumuloRyaDAO;
-import org.apache.rya.accumulo.RyaTableMutationsFactory;
-import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
-import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.resolver.RyaTripleContext;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.accumulo.core.data.Mutation;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -42,20 +35,18 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.AccumuloRyaDAO;
+import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaURI;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
 public class RyaInputFormatTest {
 
     static String username = "root", table = "rya_spo";

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..02bdb37
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya-project</artifactId>
+        <version>3.2.10-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.spark</artifactId>
+    <name>Apache Rya MapReduce Tools</name>
+
+    <dependencies>
+           <dependency>
+                   <groupId>org.apache.spark</groupId>
+                   <artifactId>spark-graphx_2.11</artifactId>
+                   <version>1.6.2</version>
+               </dependency>
+           <dependency>
+                   <groupId>org.apache.spark</groupId>
+                   <artifactId>spark-core_2.11</artifactId>
+                   <version>1.2.2</version>
+               </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>accumulo.rya</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.mapreduce</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-ntriples</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-nquads</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-rio-trig</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.mrunit</groupId>
+            <artifactId>mrunit</artifactId>
+            <classifier>hadoop2</classifier>
+            <version>1.1.0</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.rat</groupId>
+                    <artifactId>apache-rat-plugin</artifactId>
+                    <configuration>
+                        <excludes>
+                            <!-- RDF data Files -->
+                            <exclude>**/*.ntriples</exclude>
+                            <exclude>**/*.trig</exclude>
+                        </excludes>
+                    </configuration>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+
+    <profiles>
+        <profile>
+            <id>mr</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <groupId>org.apache.maven.plugins</groupId>
+                        <artifactId>maven-shade-plugin</artifactId>
+                        <executions>
+                            <execution>
+                                <configuration>
+                                    <transformers>
+                                        <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
 />
+                                    </transformers>
+                                </configuration>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+            </build>
+        </profile>
+    </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java 
b/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
new file mode 100644
index 0000000..f4b7860
--- /dev/null
+++ b/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java
@@ -0,0 +1,188 @@
+package mvm.rya.accumulo.spark;
+
+import java.io.IOException;
+
+import mvm.rya.accumulo.AccumuloRdfConstants;
+import mvm.rya.accumulo.mr.GraphXEdgeInputFormat;
+import mvm.rya.accumulo.mr.GraphXInputFormat;
+import mvm.rya.accumulo.mr.MRUtils;
+import mvm.rya.accumulo.mr.RyaInputFormat;
+import mvm.rya.accumulo.mr.RyaTypeWritable;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.accumulo.entity.EntityCentricIndex;
+
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.graphx.Edge;
+import org.apache.spark.graphx.Graph;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.storage.StorageLevel;
+
+import scala.Tuple2;
+import scala.reflect.ClassTag;
+
+import com.google.common.base.Preconditions;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class GraphXGraphGenerator {
+       
+       public String zk;
+       public String instance;
+       public String userName;
+       public String pwd;
+       public boolean mock;
+       public String tablePrefix;
+       public Authorizations authorizations;
+       
+       public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext 
sc, Configuration conf) throws IOException, AccumuloSecurityException{
+               // Load configuration parameters
+        zk = MRUtils.getACZK(conf);
+        instance = MRUtils.getACInstance(conf);
+        userName = MRUtils.getACUserName(conf);
+        pwd = MRUtils.getACPwd(conf);
+        mock = MRUtils.getACMock(conf, false);
+        tablePrefix = MRUtils.getTablePrefix(conf);
+        // Set authorizations if specified
+        String authString = conf.get(MRUtils.AC_AUTH_PROP);
+               if (authString != null && !authString.isEmpty()) {
+            authorizations = new Authorizations(authString.split(","));
+            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for 
consistency
+        }
+        else {
+            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        // Set table prefix to the default if not set
+        if (tablePrefix == null) {
+            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+            MRUtils.setTablePrefix(conf, tablePrefix);
+        }
+        // Check for required configuration parameters
+        Preconditions.checkNotNull(instance, "Accumulo instance name [" + 
MRUtils.AC_INSTANCE_PROP + "] not set.");
+        Preconditions.checkNotNull(userName, "Accumulo username [" + 
MRUtils.AC_USERNAME_PROP + "] not set.");
+        Preconditions.checkNotNull(pwd, "Accumulo password [" + 
MRUtils.AC_PWD_PROP + "] not set.");
+        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + 
MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        // If connecting to real accumulo, set additional parameters and 
require zookeepers
+        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for 
consistency
+        // Ensure consistency between alternative configuration properties
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
tablePrefix);
+
+               Job job = Job.getInstance(conf, sc.appName());
+               
+               ClientConfiguration clientConfig = new 
ClientConfiguration().with(ClientProperty.INSTANCE_NAME, 
instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
+               
+               //maybe ask conf for correct suffix?
+               GraphXInputFormat.setInputTableName(job, 
EntityCentricIndex.CONF_TABLE_SUFFIX);
+               GraphXInputFormat.setConnectorInfo(job, userName, pwd);
+               GraphXInputFormat.setZooKeeperInstance(job, clientConfig);
+               GraphXInputFormat.setScanAuthorizations(job, authorizations);
+               
+               return sc.newAPIHadoopRDD(job.getConfiguration(), 
GraphXInputFormat.class, Object.class, RyaTypeWritable.class);
+       }
+       
+       public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, 
Configuration conf) throws IOException, AccumuloSecurityException{
+               // Load configuration parameters
+        zk = MRUtils.getACZK(conf);
+        instance = MRUtils.getACInstance(conf);
+        userName = MRUtils.getACUserName(conf);
+        pwd = MRUtils.getACPwd(conf);
+        mock = MRUtils.getACMock(conf, false);
+        tablePrefix = MRUtils.getTablePrefix(conf);
+        // Set authorizations if specified
+        String authString = conf.get(MRUtils.AC_AUTH_PROP);
+               if (authString != null && !authString.isEmpty()) {
+            authorizations = new Authorizations(authString.split(","));
+            conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for 
consistency
+        }
+        else {
+            authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS;
+        }
+        // Set table prefix to the default if not set
+        if (tablePrefix == null) {
+            tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF;
+            MRUtils.setTablePrefix(conf, tablePrefix);
+        }
+        // Check for required configuration parameters
+        Preconditions.checkNotNull(instance, "Accumulo instance name [" + 
MRUtils.AC_INSTANCE_PROP + "] not set.");
+        Preconditions.checkNotNull(userName, "Accumulo username [" + 
MRUtils.AC_USERNAME_PROP + "] not set.");
+        Preconditions.checkNotNull(pwd, "Accumulo password [" + 
MRUtils.AC_PWD_PROP + "] not set.");
+        Preconditions.checkNotNull(tablePrefix, "Table prefix [" + 
MRUtils.TABLE_PREFIX_PROPERTY + "] not set.");
+        RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        // If connecting to real accumulo, set additional parameters and 
require zookeepers
+        if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for 
consistency
+        // Ensure consistency between alternative configuration properties
+        conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance);
+        conf.set(ConfigUtils.CLOUDBASE_USER, userName);
+        conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd);
+        conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock);
+        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, 
tablePrefix);
+
+               Job job = Job.getInstance(conf, sc.appName());
+               
+               ClientConfiguration clientConfig = new 
ClientConfiguration().with(ClientProperty.INSTANCE_NAME, 
instance).with(ClientProperty.INSTANCE_ZK_HOST, zk);
+               
+               RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO);
+               RyaInputFormat.setConnectorInfo(job, userName, pwd);
+               RyaInputFormat.setZooKeeperInstance(job, clientConfig);
+               RyaInputFormat.setScanAuthorizations(job, authorizations);
+               return sc.newAPIHadoopRDD(job.getConfiguration(), 
GraphXEdgeInputFormat.class, Object.class, Edge.class);
+       }
+       
+       public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext 
sc, Configuration conf) throws IOException, AccumuloSecurityException{
+               StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY();
+               StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY();
+               ClassTag<RyaTypeWritable> RTWTag = null;
+               RyaTypeWritable rtw = null;
+               RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = 
getVertexRDD(sc, conf);
+               
+               RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf);
+               JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD();
+               JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> 
tuple._2);
+               
+               RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd);
+               
+               return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, 
storageLvl2, RTWTag, RTWTag);
+       }
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+       
+}
\ No newline at end of file

Reply via email to