Repository: incubator-rya Updated Branches: refs/heads/master 7e25bdaa0 -> 3b9fb100c
initial spark support Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/77ff31e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/77ff31e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/77ff31e2 Branch: refs/heads/master Commit: 77ff31e2beecd4fe12ea633e7e7bc52c39e3ef0d Parents: 7e25bda Author: Evan Good <[email protected]> Authored: Thu Aug 18 16:15:45 2016 +0100 Committer: pujav65 <[email protected]> Committed: Tue Dec 20 10:45:53 2016 -0500 ---------------------------------------------------------------------- .../rya/indexing/accumulo/ConfigUtils.java | 1 + .../accumulo/entity/EntityCentricIndex.java | 42 ++++ mapreduce/pom.xml | 5 + .../rya/accumulo/mr/GraphXEdgeInputFormat.java | 209 +++++++++++++++++++ .../mvm/rya/accumulo/mr/GraphXInputFormat.java | 132 ++++++++++++ .../mvm/rya/accumulo/mr/RyaTypeWritable.java | 74 +++++++ .../accumulo/mr/GraphXEdgeInputFormatTest.java | 134 ++++++++++++ .../rya/accumulo/mr/GraphXInputFormatTest.java | 144 +++++++++++++ .../rya/accumulo/mr/RyaInputFormatTest.java | 27 +-- spark/pom.xml | 139 ++++++++++++ .../accumulo/spark/GraphXGraphGenerator.java | 188 +++++++++++++++++ 11 files changed, 1077 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java index 61a1003..e9e6c31 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java @@ -365,6 +365,7 @@ public class ConfigUtils { public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { + System.out.println("Testuing"); final List<String> indexList = Lists.newArrayList(); final List<String> optimizers = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java index 2a2bde3..d58b1f1 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java @@ -311,6 +311,48 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { return new RyaStatement(subject, predicate, object, context, null, columnVisibility, valueBytes, timestamp); } + + /** + * Return the RyaType of the Entity Centric Index row. + * @param key Row key, contains statement data + * @param value Row value + * @return The statement represented by the row + * @throws IOException if edge direction can't be extracted as expected. + * @throws RyaTypeResolverException if a type error occurs deserializing the statement's object. + */ + public static RyaType getRyaType(Key key, Value value) throws RyaTypeResolverException, IOException { + assert key != null; + assert value != null; + byte[] entityBytes = key.getRowData().toArray(); + byte[] data = key.getColumnQualifierData().toArray(); + + // main entity is either the subject or object + // data contains: column family , var name of other node , data of other node + datatype of object + int split = Bytes.indexOf(data, DELIM_BYTES); + byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length); + split = Bytes.indexOf(edgeBytes, DELIM_BYTES); + String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split)); + byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length); + byte[] objectBytes; + RyaURI subject; + RyaType object; + RyaType type = null; + switch (otherNodeVar) { + case "subject": + objectBytes = Bytes.concat(entityBytes, typeBytes); + object = RyaContext.getInstance().deserialize(objectBytes); //return this + type = object; + break; + case "object": + subject = new RyaURI(new String(entityBytes));//return this + type = subject; + break; + default: + throw new IOException("Failed to deserialize entity-centric index row. " + + "Expected 'subject' or 'object', encountered: '" + otherNodeVar + "'"); + } + return type; + } @Override public void init() { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/mapreduce/pom.xml b/mapreduce/pom.xml index cfe09d7..d222845 100644 --- a/mapreduce/pom.xml +++ b/mapreduce/pom.xml @@ -31,6 +31,11 @@ under the License. <name>Apache Rya MapReduce Tools</name> <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-graphx_2.11</artifactId> + <version>1.6.2</version> + </dependency> <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.api</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java new file mode 100644 index 0000000..79d6e82 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java @@ -0,0 +1,209 @@ +package mvm.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Map.Entry; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; +import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.spark.graphx.Edge; + +/** + * Subclass of {@link AbstractInputFormat} for reading + * {@link RyaStatementWritable}s directly from a running Rya instance. + */ +@SuppressWarnings("rawtypes") +public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { + /** + * Instantiates a RecordReader for this InputFormat and a given task and + * input split. + * + * @param split + * Defines the portion of the input this RecordReader is + * responsible for. + * @param context + * The context of the task. + * @return A RecordReader that can be used to fetch RyaStatementWritables. + */ + @Override + public RecordReader<Object, Edge> createRecordReader(InputSplit split, + TaskAttemptContext context) { + return new RyaStatementRecordReader(); + } + + /** + * Sets the table layout to use. + * + * @param conf + * Configuration to set the layout in. + * @param layout + * Statements will be read from the Rya table associated with + * this layout. + */ + public static void setTableLayout(Job conf, TABLE_LAYOUT layout) { + conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name()); + } + + /** + * Retrieves RyaStatementWritable objects from Accumulo tables. + */ + public class RyaStatementRecordReader extends + AbstractRecordReader<Object, Edge> { + private RyaTripleContext ryaContext; + private TABLE_LAYOUT tableLayout; + + protected void setupIterators(TaskAttemptContext context, + Scanner scanner, String tableName, RangeInputSplit split) { + } + + /** + * Initializes the RecordReader. + * + * @param inSplit + * Defines the portion of data to read. + * @param attempt + * Context for this task attempt. + * @throws IOException + * if thrown by the superclass's initialize method. + */ + @Override + public void initialize(InputSplit inSplit, TaskAttemptContext attempt) + throws IOException { + super.initialize(inSplit, attempt); + this.tableLayout = MRUtils.getTableLayout( + attempt.getConfiguration(), TABLE_LAYOUT.SPO); + // TODO verify that this is correct + this.ryaContext = RyaTripleContext + .getInstance(new AccumuloRdfConfiguration(attempt + .getConfiguration())); + } + + /** + * Load the next statement by converting the next Accumulo row to a + * statement, and make the new (key,value) pair available for retrieval. + * + * @return true if another (key,value) pair was fetched and is ready to + * be retrieved, false if there was none. + * @throws IOException + * if a row was loaded but could not be converted to a + * statement. + */ + @Override + public boolean nextKeyValue() throws IOException { + if (!scannerIterator.hasNext()) + return false; + Entry<Key, Value> entry = scannerIterator.next(); + ++numKeysRead; + currentKey = entry.getKey(); + try { + currentK = currentKey.getRow(); + RyaTypeWritable rtw = null; + RyaStatement stmt = this.ryaContext.deserializeTriple( + this.tableLayout, new TripleRow(entry.getKey().getRow() + .getBytes(), entry.getKey().getColumnFamily() + .getBytes(), entry.getKey() + .getColumnQualifier().getBytes(), entry + .getKey().getTimestamp(), entry.getKey() + .getColumnVisibility().getBytes(), entry + .getValue().get())); + + String subjURI = stmt.getSubject().getDataType().toString(); + String objURI = stmt.getObject().getDataType().toString(); + + // SHA-256 the string value and then generate a hashcode from + // the digested string, the collision ratio is less than 0.0001% + // using custom hash function should significantly reduce the + // collision ratio + MessageDigest messageDigest = MessageDigest + .getInstance("SHA-256"); + + messageDigest.update(subjURI.getBytes()); + String encryptedString = new String(messageDigest.digest()); + long subHash = hash(encryptedString); + + messageDigest.update(objURI.getBytes()); + encryptedString = new String(messageDigest.digest()); + long objHash = hash(encryptedString); + + Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>( + subHash, objHash, rtw); + currentV = writable; + } catch (TripleRowResolverException | NoSuchAlgorithmException e) { + throw new IOException(e); + } + return true; + } + + protected List<IteratorSetting> contextIterators( + TaskAttemptContext context, String tableName) { + return getIterators(context); + } + + @Override + protected void setupIterators(TaskAttemptContext context, + Scanner scanner, String tableName, + org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { + List<IteratorSetting> iterators = null; + + if (null == split) { + iterators = contextIterators(context, tableName); + } else { + iterators = split.getIterators(); + if (null == iterators) { + iterators = contextIterators(context, tableName); + } + } + + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); + } + + } + + public static long hash(String string) { + long h = 1125899906842597L; // prime + int len = string.length(); + + for (int i = 0; i < len; i++) { + h = 31 * h + string.charAt(i); + } + return h; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java new file mode 100644 index 0000000..6ec5c74 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java @@ -0,0 +1,132 @@ +package mvm.rya.accumulo.mr; + +import java.io.IOException; +import java.util.List; +import java.util.Map.Entry; +import java.util.SortedMap; + +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.resolver.RyaTypeResolverException; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class GraphXInputFormat extends InputFormatBase<Object, RyaTypeWritable> { + + private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23; + + /** + * Instantiates a RecordReader for this InputFormat and a given task and + * input split. + * + * @param split + * Defines the portion of the input this RecordReader is + * responsible for. + * @param context + * The context of the task. + * @return A RecordReader that can be used to fetch RyaStatementWritables. + */ + @Override + public RecordReader<Object, RyaTypeWritable> createRecordReader( + InputSplit split, TaskAttemptContext context) { + return new RyaStatementRecordReader(); + } + + + + /** + * Retrieves RyaStatementWritable objects from Accumulo tables. + */ + public class RyaStatementRecordReader extends + AbstractRecordReader<Object, RyaTypeWritable> { + protected void setupIterators(TaskAttemptContext context, + Scanner scanner, String tableName, + @SuppressWarnings("deprecation") RangeInputSplit split) { + IteratorSetting iteratorSetting = new IteratorSetting( + WHOLE_ROW_ITERATOR_PRIORITY, WholeRowIterator.class); + scanner.addScanIterator(iteratorSetting); + } + + /** + * Initializes the RecordReader. + * + * @param inSplit + * Defines the portion of data to read. + * @param attempt + * Context for this task attempt. + * @throws IOException + * if thrown by the superclass's initialize method. + */ + @Override + public void initialize(InputSplit inSplit, TaskAttemptContext attempt) + throws IOException { + super.initialize(inSplit, attempt); + } + + /** + * Load the next statement by converting the next Accumulo row to a + * statement, and make the new (key,value) pair available for retrieval. + * + * @return true if another (key,value) pair was fetched and is ready to + * be retrieved, false if there was none. + * @throws IOException + * if a row was loaded but could not be converted to a + * statement. + */ + @Override + public boolean nextKeyValue() throws IOException { + if (!scannerIterator.hasNext()) + return false; + Entry<Key, Value> entry = scannerIterator.next(); + ++numKeysRead; + currentKey = entry.getKey(); + + try { + currentK = currentKey.getRow(); + SortedMap<Key, Value> wholeRow = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue()); + Key key = wholeRow.firstKey(); + Value value = wholeRow.get(key); + RyaType type = EntityCentricIndex.getRyaType(key, value); + RyaTypeWritable writable = new RyaTypeWritable(); + writable.setRyaType(type); + currentV = writable; + } catch (RyaTypeResolverException e) { + throw new IOException(); + } + return true; + } + + protected List<IteratorSetting> contextIterators( + TaskAttemptContext context, String tableName) { + return getIterators(context); + } + + @Override + protected void setupIterators(TaskAttemptContext context, + Scanner scanner, String tableName, + org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { + + List<IteratorSetting> iterators = null; + + if (null == split) { + iterators = contextIterators(context, tableName); + } else { + iterators = split.getIterators(); + if (null == iterators) { + iterators = contextIterators(context, tableName); + } + } + + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java new file mode 100644 index 0000000..ddc0948 --- /dev/null +++ b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java @@ -0,0 +1,74 @@ +package mvm.rya.accumulo.mr; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.hadoop.io.WritableComparable; +import org.openrdf.model.URI; +import org.openrdf.model.impl.ValueFactoryImpl; + +public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{ + + private RyaType ryatype; + + /** + * Read part of a statement from an input stream. + * @param dataInput Stream for reading serialized statements. + * @return The next individual field, as a byte array. + * @throws IOException if reading from the stream fails. + */ + protected byte[] read(DataInput dataInput) throws IOException { + if (dataInput.readBoolean()) { + int len = dataInput.readInt(); + byte[] bytes = new byte[len]; + dataInput.readFully(bytes); + return bytes; + }else { + return null; + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + ValueFactoryImpl vfi = new ValueFactoryImpl(); + String data = dataInput.readLine(); + String dataTypeString = dataInput.readLine(); + URI dataType = vfi.createURI(dataTypeString); + ryatype.setData(data); + ryatype.setDataType(dataType); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeChars(ryatype.getData()); + dataOutput.writeChars(ryatype.getDataType().toString()); + } + + /** + * Gets the contained RyaStatement. + * @return The statement represented by this RyaStatementWritable. + */ + public RyaType getRyaType() { + return ryatype; + } + /** + * Sets the contained RyaStatement. + * @param ryaStatement The statement to be represented by this + * RyaStatementWritable. + */ + public void setRyaType(RyaType ryatype) { + this.ryatype = ryatype; + } + + @Override + public int compareTo(RyaTypeWritable o) { + return ryatype.compareTo(o.ryatype); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java new file mode 100644 index 0000000..445499d --- /dev/null +++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java @@ -0,0 +1,134 @@ +package mvm.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.ArrayList; +import java.util.List; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.spark.graphx.Edge; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class GraphXEdgeInputFormatTest { + + static String username = "root", table = "rya_spo"; + static PasswordToken password = new PasswordToken(""); + + static Instance instance; + static AccumuloRyaDAO apiImpl; + + @Before + public void init() throws Exception { + instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() + ".mock_instance"); + Connector connector = instance.getConnector(username, password); + connector.tableOperations().create(table); + + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("rya_"); + conf.setDisplayQueryPlan(false); + + apiImpl = new AccumuloRyaDAO(); + apiImpl.setConf(conf); + apiImpl.setConnector(connector); + apiImpl.init(); + } + + @After + public void after() throws Exception { + apiImpl.dropAndDestroy(); + } + + @SuppressWarnings("rawtypes") + @Test + public void testInputFormat() throws Exception { + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setColumnVisibility(new byte[0]) + .setValue(new byte[0]) + .build(); + + apiImpl.add(input); + + Job jobConf = Job.getInstance(); + + GraphXEdgeInputFormat.setMockInstance(jobConf, instance.getInstanceName()); + GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password); + GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO); + GraphXEdgeInputFormat.setInputTableName(jobConf, table); + GraphXEdgeInputFormat.setInputTableName(jobConf, table); + + GraphXEdgeInputFormat.setScanIsolation(jobConf, false); + GraphXEdgeInputFormat.setLocalIterators(jobConf, false); + GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false); + + GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat(); + + JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); + + List<InputSplit> splits = inputFormat.getSplits(context); + + Assert.assertEquals(1, splits.size()); + + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); + + RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); + + RecordReader ryaStatementRecordReader = (RecordReader) reader; + ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); + + List<Edge> results = new ArrayList<Edge>(); + while(ryaStatementRecordReader.nextKeyValue()) { + Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue(); + long srcId = writable.srcId(); + long destId = writable.dstId(); + RyaTypeWritable rtw = null; + Object text = ryaStatementRecordReader.getCurrentKey(); + Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, destId, rtw); + results.add(edge); + + System.out.println(text); + } + + System.out.println(results.size()); + System.out.println(results); + Assert.assertTrue(results.size() == 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java new file mode 100644 index 0000000..a31b27f --- /dev/null +++ b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java @@ -0,0 +1,144 @@ +package mvm.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.ArrayList; +import java.util.List; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaType; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class GraphXInputFormatTest { + + private String username = "root", table = "rya_eci"; + private PasswordToken password = new PasswordToken(""); + + private Instance instance; + private AccumuloRyaDAO apiImpl; + + @Before + public void init() throws Exception { + instance = new MockInstance(GraphXInputFormatTest.class.getName() + ".mock_instance"); + Connector connector = instance.getConnector(username, password); + connector.tableOperations().create(table); + + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("rya_"); + conf.setDisplayQueryPlan(false); + conf.setBoolean("sc.use_entity", true); + + apiImpl = new AccumuloRyaDAO(); + apiImpl.setConf(conf); + apiImpl.setConnector(connector); + apiImpl.init(); + } + + @After + public void after() throws Exception { + apiImpl.dropAndDestroy(); + } + + @Test + public void testInputFormat() throws Exception { + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setColumnVisibility(new byte[0]) + .setValue(new byte[0]) + .build(); + + apiImpl.add(input); + + Job jobConf = Job.getInstance(); + + GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName()); + GraphXInputFormat.setConnectorInfo(jobConf, username, password); + GraphXInputFormat.setInputTableName(jobConf, table); + GraphXInputFormat.setInputTableName(jobConf, table); + + GraphXInputFormat.setScanIsolation(jobConf, false); + GraphXInputFormat.setLocalIterators(jobConf, false); + GraphXInputFormat.setOfflineTableScan(jobConf, false); + + GraphXInputFormat inputFormat = new GraphXInputFormat(); + + JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); + + List<InputSplit> splits = inputFormat.getSplits(context); + + Assert.assertEquals(1, splits.size()); + + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); + + RecordReader<Object, RyaTypeWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); + + RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader; + ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); + + List<RyaType> results = new ArrayList<RyaType>(); + System.out.println("before while"); + while(ryaStatementRecordReader.nextKeyValue()) { + System.out.println("in while"); + RyaTypeWritable writable = ryaStatementRecordReader.getCurrentValue(); + RyaType value = writable.getRyaType(); + Object text = ryaStatementRecordReader.getCurrentKey(); + RyaType type = new RyaType(); + type.setData(value.getData()); + type.setDataType(value.getDataType()); + results.add(type); + + System.out.println(value.getData()); + System.out.println(value.getDataType()); + System.out.println(results); + System.out.println(type); + System.out.println(text); + System.out.println(value); + } + System.out.println("after while"); + + System.out.println(results.size()); + System.out.println(results); +// Assert.assertTrue(results.size() == 2); +// Assert.assertTrue(results.contains(input)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java index 1c75629..f571682 100644 --- a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java +++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/RyaInputFormatTest.java @@ -17,22 +17,15 @@ package org.apache.rya.accumulo.mr; * specific language governing permissions and limitations * under the License. */ -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.AccumuloRyaDAO; -import org.apache.rya.accumulo.RyaTableMutationsFactory; -import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader; -import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.api.resolver.RyaTripleContext; +import java.util.ArrayList; +import java.util.List; + import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; @@ -42,20 +35,18 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.hadoop.mrunit.mapreduce.MapDriver; -import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; - public class RyaInputFormatTest { static String username = "root", table = "rya_spo"; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml new file mode 100644 index 0000000..02bdb37 --- /dev/null +++ b/spark/pom.xml @@ -0,0 +1,139 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya-project</artifactId> + <version>3.2.10-SNAPSHOT</version> + </parent> + + <artifactId>rya.spark</artifactId> + <name>Apache Rya MapReduce Tools</name> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-graphx_2.11</artifactId> + <version>1.6.2</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_2.11</artifactId> + <version>1.2.2</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>accumulo.rya</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.mapreduce</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + </dependency> + + <dependency> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </dependency> + + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-ntriples</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-nquads</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-rio-trig</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.mrunit</groupId> + <artifactId>mrunit</artifactId> + <classifier>hadoop2</classifier> + <version>1.1.0</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <pluginManagement> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <!-- RDF data Files --> + <exclude>**/*.ntriples</exclude> + <exclude>**/*.trig</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </pluginManagement> + </build> + + <profiles> + <profile> + <id>mr</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <configuration> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/77ff31e2/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java b/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java new file mode 100644 index 0000000..f4b7860 --- /dev/null +++ b/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java @@ -0,0 +1,188 @@ +package mvm.rya.accumulo.spark; + +import java.io.IOException; + +import mvm.rya.accumulo.AccumuloRdfConstants; +import mvm.rya.accumulo.mr.GraphXEdgeInputFormat; +import mvm.rya.accumulo.mr.GraphXInputFormat; +import mvm.rya.accumulo.mr.MRUtils; +import mvm.rya.accumulo.mr.RyaInputFormat; +import mvm.rya.accumulo.mr.RyaTypeWritable; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.graphx.Edge; +import org.apache.spark.graphx.Graph; +import org.apache.spark.rdd.RDD; +import org.apache.spark.storage.StorageLevel; + +import scala.Tuple2; +import scala.reflect.ClassTag; + +import com.google.common.base.Preconditions; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class GraphXGraphGenerator { + + public String zk; + public String instance; + public String userName; + public String pwd; + public boolean mock; + public String tablePrefix; + public Authorizations authorizations; + + public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ + // Load configuration parameters + zk = MRUtils.getACZK(conf); + instance = MRUtils.getACInstance(conf); + userName = MRUtils.getACUserName(conf); + pwd = MRUtils.getACPwd(conf); + mock = MRUtils.getACMock(conf, false); + tablePrefix = MRUtils.getTablePrefix(conf); + // Set authorizations if specified + String authString = conf.get(MRUtils.AC_AUTH_PROP); + if (authString != null && !authString.isEmpty()) { + authorizations = new Authorizations(authString.split(",")); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency + } + else { + authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; + } + // Set table prefix to the default if not set + if (tablePrefix == null) { + tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; + MRUtils.setTablePrefix(conf, tablePrefix); + } + // Check for required configuration parameters + Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set."); + Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set."); + Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set."); + Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set."); + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + // If connecting to real accumulo, set additional parameters and require zookeepers + if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency + // Ensure consistency between alternative configuration properties + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + conf.set(ConfigUtils.CLOUDBASE_USER, userName); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix); + + Job job = Job.getInstance(conf, sc.appName()); + + ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk); + + //maybe ask conf for correct suffix? + GraphXInputFormat.setInputTableName(job, EntityCentricIndex.CONF_TABLE_SUFFIX); + GraphXInputFormat.setConnectorInfo(job, userName, pwd); + GraphXInputFormat.setZooKeeperInstance(job, clientConfig); + GraphXInputFormat.setScanAuthorizations(job, authorizations); + + return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXInputFormat.class, Object.class, RyaTypeWritable.class); + } + + public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ + // Load configuration parameters + zk = MRUtils.getACZK(conf); + instance = MRUtils.getACInstance(conf); + userName = MRUtils.getACUserName(conf); + pwd = MRUtils.getACPwd(conf); + mock = MRUtils.getACMock(conf, false); + tablePrefix = MRUtils.getTablePrefix(conf); + // Set authorizations if specified + String authString = conf.get(MRUtils.AC_AUTH_PROP); + if (authString != null && !authString.isEmpty()) { + authorizations = new Authorizations(authString.split(",")); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency + } + else { + authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; + } + // Set table prefix to the default if not set + if (tablePrefix == null) { + tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; + MRUtils.setTablePrefix(conf, tablePrefix); + } + // Check for required configuration parameters + Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set."); + Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set."); + Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set."); + Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set."); + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + // If connecting to real accumulo, set additional parameters and require zookeepers + if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency + // Ensure consistency between alternative configuration properties + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + conf.set(ConfigUtils.CLOUDBASE_USER, userName); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix); + + Job job = Job.getInstance(conf, sc.appName()); + + ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk); + + RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO); + RyaInputFormat.setConnectorInfo(job, userName, pwd); + RyaInputFormat.setZooKeeperInstance(job, clientConfig); + RyaInputFormat.setScanAuthorizations(job, authorizations); + return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXEdgeInputFormat.class, Object.class, Edge.class); + } + + public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ + StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY(); + StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY(); + ClassTag<RyaTypeWritable> RTWTag = null; + RyaTypeWritable rtw = null; + RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = getVertexRDD(sc, conf); + + RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf); + JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD(); + JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> tuple._2); + + RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd); + + return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, storageLvl2, RTWTag, RTWTag); + } + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +} \ No newline at end of file
