Bugfixes and updates for Spark support
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3f27536a Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3f27536a Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3f27536a Branch: refs/heads/master Commit: 3f27536a38af4983f697ecd0540e9a82185b3262 Parents: 77ff31e Author: Jesse Hatfield <[email protected]> Authored: Thu Sep 8 16:40:55 2016 -0400 Committer: pujav65 <[email protected]> Committed: Tue Dec 20 10:47:20 2016 -0500 ---------------------------------------------------------------------- .../rya/indexing/accumulo/ConfigUtils.java | 1 - .../accumulo/entity/EntityCentricIndex.java | 23 +- mapreduce/pom.xml | 20 +- .../rya/accumulo/mr/GraphXEdgeInputFormat.java | 209 ------------------ .../mvm/rya/accumulo/mr/GraphXInputFormat.java | 132 ------------ .../mvm/rya/accumulo/mr/RyaTypeWritable.java | 74 ------- .../rya/accumulo/mr/GraphXEdgeInputFormat.java | 216 +++++++++++++++++++ .../rya/accumulo/mr/GraphXInputFormat.java | 147 +++++++++++++ .../apache/rya/accumulo/mr/RyaTypeWritable.java | 123 +++++++++++ .../accumulo/mr/GraphXEdgeInputFormatTest.java | 134 ------------ .../rya/accumulo/mr/GraphXInputFormatTest.java | 144 ------------- .../accumulo/mr/GraphXEdgeInputFormatTest.java | 134 ++++++++++++ .../rya/accumulo/mr/GraphXInputFormatTest.java | 142 ++++++++++++ pom.xml | 1 + spark/pom.xml | 6 +- .../accumulo/spark/GraphXGraphGenerator.java | 188 ---------------- .../accumulo/spark/GraphXGraphGenerator.java | 183 ++++++++++++++++ 17 files changed, 978 insertions(+), 899 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java index e9e6c31..61a1003 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java @@ -365,7 +365,6 @@ public class ConfigUtils { public static void setIndexers(final RdfCloudTripleStoreConfiguration conf) { - System.out.println("Testuing"); final List<String> indexList = Lists.newArrayList(); final List<String> optimizers = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java index d58b1f1..0676e3d 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/entity/EntityCentricIndex.java @@ -24,6 +24,7 @@ import static org.apache.rya.accumulo.AccumuloRdfConstants.EMPTY_VALUE; import static org.apache.rya.api.RdfCloudTripleStoreConstants.DELIM_BYTES; import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_BYTES; import static org.apache.rya.api.RdfCloudTripleStoreConstants.EMPTY_TEXT; +import static org.apache.rya.api.RdfCloudTripleStoreConstants.TYPE_DELIM_BYTES; import java.io.IOException; import java.util.Arrays; @@ -281,24 +282,26 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { final byte[] columnFamily = Arrays.copyOf(data, split); final byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length); split = Bytes.indexOf(edgeBytes, DELIM_BYTES); - final String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split)); - final byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length - 2); - final byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length); + String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split)); + byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length); + split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES); + byte[] otherNodeData = Arrays.copyOf(otherNodeBytes, split); + byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes, split, otherNodeBytes.length); byte[] objectBytes; RyaURI subject; final RyaURI predicate = new RyaURI(new String(predicateBytes)); RyaType object; RyaURI context = null; - // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype_marker} - // or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype_marker} + // Expect either: entity=subject.data, otherNodeVar="object", otherNodeBytes={object.data, object.datatype} + // or: entity=object.data, otherNodeVar="subject", otherNodeBytes={subject.data, object.datatype} switch (otherNodeVar) { case "subject": - subject = new RyaURI(new String(otherNodeBytes)); + subject = new RyaURI(new String(otherNodeData)); objectBytes = Bytes.concat(entityBytes, typeBytes); break; case "object": subject = new RyaURI(new String(entityBytes)); - objectBytes = Bytes.concat(otherNodeBytes, typeBytes); + objectBytes = Bytes.concat(otherNodeData, typeBytes); break; default: throw new IOException("Failed to deserialize entity-centric index row. " @@ -311,7 +314,7 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { return new RyaStatement(subject, predicate, object, context, null, columnVisibility, valueBytes, timestamp); } - + /** * Return the RyaType of the Entity Centric Index row. * @param key Row key, contains statement data @@ -332,7 +335,9 @@ public class EntityCentricIndex extends AbstractAccumuloIndexer { byte[] edgeBytes = Arrays.copyOfRange(data, split + DELIM_BYTES.length, data.length); split = Bytes.indexOf(edgeBytes, DELIM_BYTES); String otherNodeVar = new String(Arrays.copyOf(edgeBytes, split)); - byte[] typeBytes = Arrays.copyOfRange(edgeBytes, edgeBytes.length - 2, edgeBytes.length); + byte[] otherNodeBytes = Arrays.copyOfRange(edgeBytes, split + DELIM_BYTES.length, edgeBytes.length); + split = Bytes.indexOf(otherNodeBytes, TYPE_DELIM_BYTES); + byte[] typeBytes = Arrays.copyOfRange(otherNodeBytes, split, otherNodeBytes.length); byte[] objectBytes; RyaURI subject; RyaType object; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/mapreduce/pom.xml b/mapreduce/pom.xml index d222845..c4f94f6 100644 --- a/mapreduce/pom.xml +++ b/mapreduce/pom.xml @@ -31,11 +31,11 @@ under the License. <name>Apache Rya MapReduce Tools</name> <dependencies> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-graphx_2.11</artifactId> - <version>1.6.2</version> - </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-graphx_2.11</artifactId> + <version>1.6.2</version> + </dependency> <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.api</artifactId> @@ -116,6 +116,16 @@ under the License. <executions> <execution> <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> </transformers> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java deleted file mode 100644 index 79d6e82..0000000 --- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormat.java +++ /dev/null @@ -1,209 +0,0 @@ -package mvm.rya.accumulo.mr; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.io.IOException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.List; -import java.util.Map.Entry; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.resolver.RyaTripleContext; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase; -import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.spark.graphx.Edge; - -/** - * Subclass of {@link AbstractInputFormat} for reading - * {@link RyaStatementWritable}s directly from a running Rya instance. - */ -@SuppressWarnings("rawtypes") -public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { - /** - * Instantiates a RecordReader for this InputFormat and a given task and - * input split. - * - * @param split - * Defines the portion of the input this RecordReader is - * responsible for. - * @param context - * The context of the task. - * @return A RecordReader that can be used to fetch RyaStatementWritables. - */ - @Override - public RecordReader<Object, Edge> createRecordReader(InputSplit split, - TaskAttemptContext context) { - return new RyaStatementRecordReader(); - } - - /** - * Sets the table layout to use. - * - * @param conf - * Configuration to set the layout in. - * @param layout - * Statements will be read from the Rya table associated with - * this layout. - */ - public static void setTableLayout(Job conf, TABLE_LAYOUT layout) { - conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name()); - } - - /** - * Retrieves RyaStatementWritable objects from Accumulo tables. - */ - public class RyaStatementRecordReader extends - AbstractRecordReader<Object, Edge> { - private RyaTripleContext ryaContext; - private TABLE_LAYOUT tableLayout; - - protected void setupIterators(TaskAttemptContext context, - Scanner scanner, String tableName, RangeInputSplit split) { - } - - /** - * Initializes the RecordReader. - * - * @param inSplit - * Defines the portion of data to read. - * @param attempt - * Context for this task attempt. - * @throws IOException - * if thrown by the superclass's initialize method. - */ - @Override - public void initialize(InputSplit inSplit, TaskAttemptContext attempt) - throws IOException { - super.initialize(inSplit, attempt); - this.tableLayout = MRUtils.getTableLayout( - attempt.getConfiguration(), TABLE_LAYOUT.SPO); - // TODO verify that this is correct - this.ryaContext = RyaTripleContext - .getInstance(new AccumuloRdfConfiguration(attempt - .getConfiguration())); - } - - /** - * Load the next statement by converting the next Accumulo row to a - * statement, and make the new (key,value) pair available for retrieval. - * - * @return true if another (key,value) pair was fetched and is ready to - * be retrieved, false if there was none. - * @throws IOException - * if a row was loaded but could not be converted to a - * statement. - */ - @Override - public boolean nextKeyValue() throws IOException { - if (!scannerIterator.hasNext()) - return false; - Entry<Key, Value> entry = scannerIterator.next(); - ++numKeysRead; - currentKey = entry.getKey(); - try { - currentK = currentKey.getRow(); - RyaTypeWritable rtw = null; - RyaStatement stmt = this.ryaContext.deserializeTriple( - this.tableLayout, new TripleRow(entry.getKey().getRow() - .getBytes(), entry.getKey().getColumnFamily() - .getBytes(), entry.getKey() - .getColumnQualifier().getBytes(), entry - .getKey().getTimestamp(), entry.getKey() - .getColumnVisibility().getBytes(), entry - .getValue().get())); - - String subjURI = stmt.getSubject().getDataType().toString(); - String objURI = stmt.getObject().getDataType().toString(); - - // SHA-256 the string value and then generate a hashcode from - // the digested string, the collision ratio is less than 0.0001% - // using custom hash function should significantly reduce the - // collision ratio - MessageDigest messageDigest = MessageDigest - .getInstance("SHA-256"); - - messageDigest.update(subjURI.getBytes()); - String encryptedString = new String(messageDigest.digest()); - long subHash = hash(encryptedString); - - messageDigest.update(objURI.getBytes()); - encryptedString = new String(messageDigest.digest()); - long objHash = hash(encryptedString); - - Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>( - subHash, objHash, rtw); - currentV = writable; - } catch (TripleRowResolverException | NoSuchAlgorithmException e) { - throw new IOException(e); - } - return true; - } - - protected List<IteratorSetting> contextIterators( - TaskAttemptContext context, String tableName) { - return getIterators(context); - } - - @Override - protected void setupIterators(TaskAttemptContext context, - Scanner scanner, String tableName, - org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { - List<IteratorSetting> iterators = null; - - if (null == split) { - iterators = contextIterators(context, tableName); - } else { - iterators = split.getIterators(); - if (null == iterators) { - iterators = contextIterators(context, tableName); - } - } - - for (IteratorSetting iterator : iterators) - scanner.addScanIterator(iterator); - } - - } - - public static long hash(String string) { - long h = 1125899906842597L; // prime - int len = string.length(); - - for (int i = 0; i < len; i++) { - h = 31 * h + string.charAt(i); - } - return h; - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java deleted file mode 100644 index 6ec5c74..0000000 --- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/GraphXInputFormat.java +++ /dev/null @@ -1,132 +0,0 @@ -package mvm.rya.accumulo.mr; - -import java.io.IOException; -import java.util.List; -import java.util.Map.Entry; -import java.util.SortedMap; - -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.resolver.RyaTypeResolverException; -import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.mapreduce.InputFormatBase; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.user.WholeRowIterator; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -public class GraphXInputFormat extends InputFormatBase<Object, RyaTypeWritable> { - - private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23; - - /** - * Instantiates a RecordReader for this InputFormat and a given task and - * input split. - * - * @param split - * Defines the portion of the input this RecordReader is - * responsible for. - * @param context - * The context of the task. - * @return A RecordReader that can be used to fetch RyaStatementWritables. - */ - @Override - public RecordReader<Object, RyaTypeWritable> createRecordReader( - InputSplit split, TaskAttemptContext context) { - return new RyaStatementRecordReader(); - } - - - - /** - * Retrieves RyaStatementWritable objects from Accumulo tables. - */ - public class RyaStatementRecordReader extends - AbstractRecordReader<Object, RyaTypeWritable> { - protected void setupIterators(TaskAttemptContext context, - Scanner scanner, String tableName, - @SuppressWarnings("deprecation") RangeInputSplit split) { - IteratorSetting iteratorSetting = new IteratorSetting( - WHOLE_ROW_ITERATOR_PRIORITY, WholeRowIterator.class); - scanner.addScanIterator(iteratorSetting); - } - - /** - * Initializes the RecordReader. - * - * @param inSplit - * Defines the portion of data to read. - * @param attempt - * Context for this task attempt. - * @throws IOException - * if thrown by the superclass's initialize method. - */ - @Override - public void initialize(InputSplit inSplit, TaskAttemptContext attempt) - throws IOException { - super.initialize(inSplit, attempt); - } - - /** - * Load the next statement by converting the next Accumulo row to a - * statement, and make the new (key,value) pair available for retrieval. - * - * @return true if another (key,value) pair was fetched and is ready to - * be retrieved, false if there was none. - * @throws IOException - * if a row was loaded but could not be converted to a - * statement. - */ - @Override - public boolean nextKeyValue() throws IOException { - if (!scannerIterator.hasNext()) - return false; - Entry<Key, Value> entry = scannerIterator.next(); - ++numKeysRead; - currentKey = entry.getKey(); - - try { - currentK = currentKey.getRow(); - SortedMap<Key, Value> wholeRow = WholeRowIterator.decodeRow(entry.getKey(), entry.getValue()); - Key key = wholeRow.firstKey(); - Value value = wholeRow.get(key); - RyaType type = EntityCentricIndex.getRyaType(key, value); - RyaTypeWritable writable = new RyaTypeWritable(); - writable.setRyaType(type); - currentV = writable; - } catch (RyaTypeResolverException e) { - throw new IOException(); - } - return true; - } - - protected List<IteratorSetting> contextIterators( - TaskAttemptContext context, String tableName) { - return getIterators(context); - } - - @Override - protected void setupIterators(TaskAttemptContext context, - Scanner scanner, String tableName, - org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { - - List<IteratorSetting> iterators = null; - - if (null == split) { - iterators = contextIterators(context, tableName); - } else { - iterators = split.getIterators(); - if (null == iterators) { - iterators = contextIterators(context, tableName); - } - } - - for (IteratorSetting iterator : iterators) - scanner.addScanIterator(iterator); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java b/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java deleted file mode 100644 index ddc0948..0000000 --- a/mapreduce/src/main/java/mvm/rya/accumulo/mr/RyaTypeWritable.java +++ /dev/null @@ -1,74 +0,0 @@ -package mvm.rya.accumulo.mr; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.resolver.triple.TripleRow; -import mvm.rya.api.resolver.triple.TripleRowResolverException; - -import org.apache.hadoop.io.WritableComparable; -import org.openrdf.model.URI; -import org.openrdf.model.impl.ValueFactoryImpl; - -public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{ - - private RyaType ryatype; - - /** - * Read part of a statement from an input stream. - * @param dataInput Stream for reading serialized statements. - * @return The next individual field, as a byte array. - * @throws IOException if reading from the stream fails. - */ - protected byte[] read(DataInput dataInput) throws IOException { - if (dataInput.readBoolean()) { - int len = dataInput.readInt(); - byte[] bytes = new byte[len]; - dataInput.readFully(bytes); - return bytes; - }else { - return null; - } - } - - @Override - public void readFields(DataInput dataInput) throws IOException { - ValueFactoryImpl vfi = new ValueFactoryImpl(); - String data = dataInput.readLine(); - String dataTypeString = dataInput.readLine(); - URI dataType = vfi.createURI(dataTypeString); - ryatype.setData(data); - ryatype.setDataType(dataType); - } - - @Override - public void write(DataOutput dataOutput) throws IOException { - dataOutput.writeChars(ryatype.getData()); - dataOutput.writeChars(ryatype.getDataType().toString()); - } - - /** - * Gets the contained RyaStatement. - * @return The statement represented by this RyaStatementWritable. - */ - public RyaType getRyaType() { - return ryatype; - } - /** - * Sets the contained RyaStatement. - * @param ryaStatement The statement to be represented by this - * RyaStatementWritable. - */ - public void setRyaType(RyaType ryatype) { - this.ryatype = ryatype; - } - - @Override - public int compareTo(RyaTypeWritable o) { - return ryatype.compareTo(o.ryatype); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java new file mode 100644 index 0000000..489fd34 --- /dev/null +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormat.java @@ -0,0 +1,216 @@ +package org.apache.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RyaTripleContext; +import org.apache.rya.api.resolver.triple.TripleRow; +import org.apache.rya.api.resolver.triple.TripleRowResolverException; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.spark.graphx.Edge; + +/** + * Subclass of {@link AbstractInputFormat} for reading + * {@link RyaStatementWritable}s directly from a running Rya instance. + */ +@SuppressWarnings("rawtypes") +public class GraphXEdgeInputFormat extends InputFormatBase<Object, Edge> { + /** + * Instantiates a RecordReader for this InputFormat and a given task and + * input split. + * + * @param split + * Defines the portion of the input this RecordReader is + * responsible for. + * @param context + * The context of the task. + * @return A RecordReader that can be used to fetch RyaStatementWritables. + */ + @Override + public RecordReader<Object, Edge> createRecordReader(InputSplit split, + TaskAttemptContext context) { + return new RyaStatementRecordReader(); + } + + /** + * Sets the table layout to use. + * + * @param conf + * Configuration to set the layout in. + * @param layout + * Statements will be read from the Rya table associated with + * this layout. + */ + public static void setTableLayout(Job conf, TABLE_LAYOUT layout) { + conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name()); + } + + /** + * Retrieves RyaStatementWritable objects from Accumulo tables. + */ + public class RyaStatementRecordReader extends + AbstractRecordReader<Object, Edge> { + private RyaTripleContext ryaContext; + private TABLE_LAYOUT tableLayout; + + protected void setupIterators(TaskAttemptContext context, + Scanner scanner, String tableName, RangeInputSplit split) { + } + + /** + * Initializes the RecordReader. + * + * @param inSplit + * Defines the portion of data to read. + * @param attempt + * Context for this task attempt. + * @throws IOException + * if thrown by the superclass's initialize method. + */ + @Override + public void initialize(InputSplit inSplit, TaskAttemptContext attempt) + throws IOException { + super.initialize(inSplit, attempt); + this.tableLayout = MRUtils.getTableLayout( + attempt.getConfiguration(), TABLE_LAYOUT.SPO); + // TODO verify that this is correct + this.ryaContext = RyaTripleContext + .getInstance(new AccumuloRdfConfiguration(attempt + .getConfiguration())); + } + + /** + * Load the next statement by converting the next Accumulo row to a + * statement, and make the new (key,value) pair available for retrieval. + * + * @return true if another (key,value) pair was fetched and is ready to + * be retrieved, false if there was none. + * @throws IOException + * if a row was loaded but could not be converted to a + * statement. + */ + @Override + public boolean nextKeyValue() throws IOException { + if (!scannerIterator.hasNext()) + return false; + Entry<Key, Value> entry = scannerIterator.next(); + ++numKeysRead; + currentKey = entry.getKey(); + try { + currentK = currentKey.getRow(); + RyaTypeWritable rtw = new RyaTypeWritable(); + RyaStatement stmt = this.ryaContext.deserializeTriple( + this.tableLayout, new TripleRow(entry.getKey().getRow() + .getBytes(), entry.getKey().getColumnFamily() + .getBytes(), entry.getKey() + .getColumnQualifier().getBytes(), entry + .getKey().getTimestamp(), entry.getKey() + .getColumnVisibility().getBytes(), entry + .getValue().get())); + + long subHash = getVertexId(stmt.getSubject()); + long objHash = getVertexId(stmt.getObject()); + rtw.setRyaType(stmt.getPredicate()); + + Edge<RyaTypeWritable> writable = new Edge<RyaTypeWritable>( + subHash, objHash, rtw); + currentV = writable; + } catch (TripleRowResolverException e) { + throw new IOException(e); + } + return true; + } + + protected List<IteratorSetting> contextIterators( + TaskAttemptContext context, String tableName) { + return getIterators(context); + } + + @Override + protected void setupIterators(TaskAttemptContext context, + Scanner scanner, String tableName, + org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { + List<IteratorSetting> iterators = null; + + if (null == split) { + iterators = contextIterators(context, tableName); + } else { + iterators = split.getIterators(); + if (null == iterators) { + iterators = contextIterators(context, tableName); + } + } + + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); + } + + } + + public static long getVertexId(RyaType resource) throws IOException { + String uri = ""; + if (resource != null) { + uri = resource.getData().toString(); + } + try { + // SHA-256 the string value and then generate a hashcode from + // the digested string, the collision ratio is less than 0.0001% + // using custom hash function should significantly reduce the + // collision ratio + MessageDigest messageDigest = MessageDigest + .getInstance("SHA-256"); + messageDigest.update(uri.getBytes()); + String encryptedString = new String(messageDigest.digest()); + return hash(encryptedString); + } + catch (NoSuchAlgorithmException e) { + throw new IOException(e); + } + } + + public static long hash(String string) { + long h = 1125899906842597L; // prime + int len = string.length(); + + for (int i = 0; i < len; i++) { + h = 31 * h + string.charAt(i); + } + return h; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java new file mode 100644 index 0000000..77f4e63 --- /dev/null +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/GraphXInputFormat.java @@ -0,0 +1,147 @@ +package org.apache.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RyaTypeResolverException; +import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.WholeRowIterator; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class GraphXInputFormat extends InputFormatBase<Object, RyaTypeWritable> { + + private static final int WHOLE_ROW_ITERATOR_PRIORITY = 23; + + /** + * Instantiates a RecordReader for this InputFormat and a given task and + * input split. + * + * @param split + * Defines the portion of the input this RecordReader is + * responsible for. + * @param context + * The context of the task. + * @return A RecordReader that can be used to fetch RyaStatementWritables. + */ + @Override + public RecordReader<Object, RyaTypeWritable> createRecordReader( + InputSplit split, TaskAttemptContext context) { + return new RyaStatementRecordReader(); + } + + + + /** + * Retrieves RyaStatementWritable objects from Accumulo tables. + */ + public class RyaStatementRecordReader extends + AbstractRecordReader<Object, RyaTypeWritable> { + protected void setupIterators(TaskAttemptContext context, + Scanner scanner, String tableName, + @SuppressWarnings("deprecation") RangeInputSplit split) { + IteratorSetting iteratorSetting = new IteratorSetting( + WHOLE_ROW_ITERATOR_PRIORITY, WholeRowIterator.class); + scanner.addScanIterator(iteratorSetting); + } + + /** + * Initializes the RecordReader. + * + * @param inSplit + * Defines the portion of data to read. + * @param attempt + * Context for this task attempt. + * @throws IOException + * if thrown by the superclass's initialize method. + */ + @Override + public void initialize(InputSplit inSplit, TaskAttemptContext attempt) + throws IOException { + super.initialize(inSplit, attempt); + } + + /** + * Load the next statement by converting the next Accumulo row to a + * statement, and make the new (key,value) pair available for retrieval. + * + * @return true if another (key,value) pair was fetched and is ready to + * be retrieved, false if there was none. + * @throws IOException + * if a row was loaded but could not be converted to a + * statement. + */ + @Override + public boolean nextKeyValue() throws IOException { + if (!scannerIterator.hasNext()) + return false; + Entry<Key, Value> entry = scannerIterator.next(); + ++numKeysRead; + currentKey = entry.getKey(); + + try { + RyaType type = EntityCentricIndex.getRyaType(currentKey, entry.getValue()); + RyaTypeWritable writable = new RyaTypeWritable(); + writable.setRyaType(type); + currentK = GraphXEdgeInputFormat.getVertexId(type); + currentV = writable; + } catch (RyaTypeResolverException e) { + throw new IOException(); + } + return true; + } + + protected List<IteratorSetting> contextIterators( + TaskAttemptContext context, String tableName) { + return getIterators(context); + } + + @Override + protected void setupIterators(TaskAttemptContext context, + Scanner scanner, String tableName, + org.apache.accumulo.core.client.mapreduce.RangeInputSplit split) { + + List<IteratorSetting> iterators = null; + + if (null == split) { + iterators = contextIterators(context, tableName); + } else { + iterators = split.getIterators(); + if (null == iterators) { + iterators = contextIterators(context, tableName); + } + } + + for (IteratorSetting iterator : iterators) + scanner.addScanIterator(iterator); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java new file mode 100644 index 0000000..ec47d82 --- /dev/null +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaTypeWritable.java @@ -0,0 +1,123 @@ +package org.apache.rya.accumulo.mr; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.rya.api.domain.RyaType; + +import org.apache.hadoop.io.WritableComparable; +import org.openrdf.model.URI; +import org.openrdf.model.impl.ValueFactoryImpl; + +public class RyaTypeWritable implements WritableComparable<RyaTypeWritable>{ + + private RyaType ryatype; + + /** + * Read part of a statement from an input stream. + * @param dataInput Stream for reading serialized statements. + * @return The next individual field, as a byte array. + * @throws IOException if reading from the stream fails. + */ + protected byte[] read(DataInput dataInput) throws IOException { + if (dataInput.readBoolean()) { + int len = dataInput.readInt(); + byte[] bytes = new byte[len]; + dataInput.readFully(bytes); + return bytes; + }else { + return null; + } + } + + @Override + public void readFields(DataInput dataInput) throws IOException { + ValueFactoryImpl vfi = new ValueFactoryImpl(); + String data = dataInput.readLine(); + String dataTypeString = dataInput.readLine(); + URI dataType = vfi.createURI(dataTypeString); + ryatype.setData(data); + ryatype.setDataType(dataType); + } + + @Override + public void write(DataOutput dataOutput) throws IOException { + dataOutput.writeChars(ryatype.getData()); + dataOutput.writeChars(ryatype.getDataType().toString()); + } + + /** + * Gets the contained RyaStatement. + * @return The statement represented by this RyaStatementWritable. + */ + public RyaType getRyaType() { + return ryatype; + } + /** + * Sets the contained RyaStatement. + * @param ryaStatement The statement to be represented by this + * RyaStatementWritable. + */ + public void setRyaType(RyaType ryatype) { + this.ryatype = ryatype; + } + + @Override + public int compareTo(RyaTypeWritable o) { + return ryatype.compareTo(o.ryatype); + } + + /** + * Tests for equality using the equals method of the enclosed RyaType. + * @param o Object to compare with + * @return true if both objects are RyaTypeWritables containing equivalent + * RyaTypes. + */ + @Override + public boolean equals(Object o) { + if (o == this) { + return true; + } + if (o == null || !(o instanceof RyaTypeWritable)) { + return false; + } + RyaType rtThis = ryatype; + RyaType rtOther = ((RyaTypeWritable) o).ryatype; + if (rtThis == null) { + return rtOther == null; + } + else { + return rtThis.equals(rtOther); + } + } + + @Override + public int hashCode() { + if (ryatype == null) { + return 0; + } + else { + return ryatype.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java deleted file mode 100644 index 445499d..0000000 --- a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXEdgeInputFormatTest.java +++ /dev/null @@ -1,134 +0,0 @@ -package mvm.rya.accumulo.mr; -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import java.util.ArrayList; -import java.util.List; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaURI; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.apache.spark.graphx.Edge; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class GraphXEdgeInputFormatTest { - - static String username = "root", table = "rya_spo"; - static PasswordToken password = new PasswordToken(""); - - static Instance instance; - static AccumuloRyaDAO apiImpl; - - @Before - public void init() throws Exception { - instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() + ".mock_instance"); - Connector connector = instance.getConnector(username, password); - connector.tableOperations().create(table); - - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix("rya_"); - conf.setDisplayQueryPlan(false); - - apiImpl = new AccumuloRyaDAO(); - apiImpl.setConf(conf); - apiImpl.setConnector(connector); - apiImpl.init(); - } - - @After - public void after() throws Exception { - apiImpl.dropAndDestroy(); - } - - @SuppressWarnings("rawtypes") - @Test - public void testInputFormat() throws Exception { - RyaStatement input = RyaStatement.builder() - .setSubject(new RyaURI("http://www.google.com")) - .setPredicate(new RyaURI("http://some_other_uri")) - .setObject(new RyaURI("http://www.yahoo.com")) - .setColumnVisibility(new byte[0]) - .setValue(new byte[0]) - .build(); - - apiImpl.add(input); - - Job jobConf = Job.getInstance(); - - GraphXEdgeInputFormat.setMockInstance(jobConf, instance.getInstanceName()); - GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password); - GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO); - GraphXEdgeInputFormat.setInputTableName(jobConf, table); - GraphXEdgeInputFormat.setInputTableName(jobConf, table); - - GraphXEdgeInputFormat.setScanIsolation(jobConf, false); - GraphXEdgeInputFormat.setLocalIterators(jobConf, false); - GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false); - - GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat(); - - JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); - - List<InputSplit> splits = inputFormat.getSplits(context); - - Assert.assertEquals(1, splits.size()); - - TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); - - RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); - - RecordReader ryaStatementRecordReader = (RecordReader) reader; - ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); - - List<Edge> results = new ArrayList<Edge>(); - while(ryaStatementRecordReader.nextKeyValue()) { - Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue(); - long srcId = writable.srcId(); - long destId = writable.dstId(); - RyaTypeWritable rtw = null; - Object text = ryaStatementRecordReader.getCurrentKey(); - Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, destId, rtw); - results.add(edge); - - System.out.println(text); - } - - System.out.println(results.size()); - System.out.println(results); - Assert.assertTrue(results.size() == 2); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java b/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java deleted file mode 100644 index a31b27f..0000000 --- a/mapreduce/src/test/java/mvm/rya/accumulo/mr/GraphXInputFormatTest.java +++ /dev/null @@ -1,144 +0,0 @@ -package mvm.rya.accumulo.mr; -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import java.util.ArrayList; -import java.util.List; - -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.RyaType; -import mvm.rya.api.domain.RyaURI; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; - -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; -import org.apache.hadoop.mapreduce.TaskID; -import org.apache.hadoop.mapreduce.task.JobContextImpl; -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class GraphXInputFormatTest { - - private String username = "root", table = "rya_eci"; - private PasswordToken password = new PasswordToken(""); - - private Instance instance; - private AccumuloRyaDAO apiImpl; - - @Before - public void init() throws Exception { - instance = new MockInstance(GraphXInputFormatTest.class.getName() + ".mock_instance"); - Connector connector = instance.getConnector(username, password); - connector.tableOperations().create(table); - - AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix("rya_"); - conf.setDisplayQueryPlan(false); - conf.setBoolean("sc.use_entity", true); - - apiImpl = new AccumuloRyaDAO(); - apiImpl.setConf(conf); - apiImpl.setConnector(connector); - apiImpl.init(); - } - - @After - public void after() throws Exception { - apiImpl.dropAndDestroy(); - } - - @Test - public void testInputFormat() throws Exception { - RyaStatement input = RyaStatement.builder() - .setSubject(new RyaURI("http://www.google.com")) - .setPredicate(new RyaURI("http://some_other_uri")) - .setObject(new RyaURI("http://www.yahoo.com")) - .setColumnVisibility(new byte[0]) - .setValue(new byte[0]) - .build(); - - apiImpl.add(input); - - Job jobConf = Job.getInstance(); - - GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName()); - GraphXInputFormat.setConnectorInfo(jobConf, username, password); - GraphXInputFormat.setInputTableName(jobConf, table); - GraphXInputFormat.setInputTableName(jobConf, table); - - GraphXInputFormat.setScanIsolation(jobConf, false); - GraphXInputFormat.setLocalIterators(jobConf, false); - GraphXInputFormat.setOfflineTableScan(jobConf, false); - - GraphXInputFormat inputFormat = new GraphXInputFormat(); - - JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); - - List<InputSplit> splits = inputFormat.getSplits(context); - - Assert.assertEquals(1, splits.size()); - - TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); - - RecordReader<Object, RyaTypeWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); - - RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader; - ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); - - List<RyaType> results = new ArrayList<RyaType>(); - System.out.println("before while"); - while(ryaStatementRecordReader.nextKeyValue()) { - System.out.println("in while"); - RyaTypeWritable writable = ryaStatementRecordReader.getCurrentValue(); - RyaType value = writable.getRyaType(); - Object text = ryaStatementRecordReader.getCurrentKey(); - RyaType type = new RyaType(); - type.setData(value.getData()); - type.setDataType(value.getDataType()); - results.add(type); - - System.out.println(value.getData()); - System.out.println(value.getDataType()); - System.out.println(results); - System.out.println(type); - System.out.println(text); - System.out.println(value); - } - System.out.println("after while"); - - System.out.println(results.size()); - System.out.println(results); -// Assert.assertTrue(results.size() == 2); -// Assert.assertTrue(results.contains(input)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java new file mode 100644 index 0000000..6686c8f --- /dev/null +++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXEdgeInputFormatTest.java @@ -0,0 +1,134 @@ +package org.apache.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.ArrayList; +import java.util.List; + +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.spark.graphx.Edge; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class GraphXEdgeInputFormatTest { + + static String username = "root", table = "rya_spo"; + static PasswordToken password = new PasswordToken(""); + + static Instance instance; + static AccumuloRyaDAO apiImpl; + + @Before + public void init() throws Exception { + instance = new MockInstance(GraphXEdgeInputFormatTest.class.getName() + ".mock_instance"); + Connector connector = instance.getConnector(username, password); + connector.tableOperations().create(table); + + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("rya_"); + conf.setDisplayQueryPlan(false); + + apiImpl = new AccumuloRyaDAO(); + apiImpl.setConf(conf); + apiImpl.setConnector(connector); + apiImpl.init(); + } + + @After + public void after() throws Exception { + apiImpl.dropAndDestroy(); + } + + @SuppressWarnings("rawtypes") + @Test + public void testInputFormat() throws Exception { + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setColumnVisibility(new byte[0]) + .setValue(new byte[0]) + .build(); + + apiImpl.add(input); + + Job jobConf = Job.getInstance(); + + GraphXEdgeInputFormat.setMockInstance(jobConf, instance.getInstanceName()); + GraphXEdgeInputFormat.setConnectorInfo(jobConf, username, password); + GraphXEdgeInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO); + GraphXEdgeInputFormat.setInputTableName(jobConf, table); + GraphXEdgeInputFormat.setInputTableName(jobConf, table); + + GraphXEdgeInputFormat.setScanIsolation(jobConf, false); + GraphXEdgeInputFormat.setLocalIterators(jobConf, false); + GraphXEdgeInputFormat.setOfflineTableScan(jobConf, false); + + GraphXEdgeInputFormat inputFormat = new GraphXEdgeInputFormat(); + + JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); + + List<InputSplit> splits = inputFormat.getSplits(context); + + Assert.assertEquals(1, splits.size()); + + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); + + RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); + + RecordReader ryaStatementRecordReader = (RecordReader) reader; + ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); + + List<Edge> results = new ArrayList<Edge>(); + while(ryaStatementRecordReader.nextKeyValue()) { + Edge writable = (Edge) ryaStatementRecordReader.getCurrentValue(); + long srcId = writable.srcId(); + long destId = writable.dstId(); + RyaTypeWritable rtw = null; + Object text = ryaStatementRecordReader.getCurrentKey(); + Edge<RyaTypeWritable> edge = new Edge<RyaTypeWritable>(srcId, destId, rtw); + results.add(edge); + + System.out.println(text); + } + + System.out.println(results.size()); + System.out.println(results); + Assert.assertTrue(results.size() == 2); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java new file mode 100644 index 0000000..b2a663c --- /dev/null +++ b/mapreduce/src/test/java/org/apache/rya/accumulo/mr/GraphXInputFormatTest.java @@ -0,0 +1,142 @@ +package org.apache.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.ArrayList; +import java.util.List; + +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.accumulo.mr.GraphXInputFormat.RyaStatementRecordReader; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class GraphXInputFormatTest { + + private String username = "root", table = "rya_eci"; + private PasswordToken password = new PasswordToken(""); + + private Instance instance; + private AccumuloRyaDAO apiImpl; + + @Before + public void init() throws Exception { + instance = new MockInstance(GraphXInputFormatTest.class.getName() + ".mock_instance"); + Connector connector = instance.getConnector(username, password); + connector.tableOperations().create(table); + + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("rya_"); + conf.setDisplayQueryPlan(false); + conf.setBoolean("sc.use_entity", true); + + apiImpl = new AccumuloRyaDAO(); + apiImpl.setConf(conf); + apiImpl.setConnector(connector); + apiImpl.init(); + } + + @After + public void after() throws Exception { + apiImpl.dropAndDestroy(); + } + + @Test + public void testInputFormat() throws Exception { + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setColumnVisibility(new byte[0]) + .setValue(new byte[0]) + .build(); + + apiImpl.add(input); + + Job jobConf = Job.getInstance(); + + GraphXInputFormat.setMockInstance(jobConf, instance.getInstanceName()); + GraphXInputFormat.setConnectorInfo(jobConf, username, password); + GraphXInputFormat.setInputTableName(jobConf, table); + GraphXInputFormat.setInputTableName(jobConf, table); + + GraphXInputFormat.setScanIsolation(jobConf, false); + GraphXInputFormat.setLocalIterators(jobConf, false); + GraphXInputFormat.setOfflineTableScan(jobConf, false); + + GraphXInputFormat inputFormat = new GraphXInputFormat(); + + JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); + + List<InputSplit> splits = inputFormat.getSplits(context); + + Assert.assertEquals(1, splits.size()); + + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); + + RecordReader<Object, RyaTypeWritable> reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); + + RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader; + ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); + + List<RyaType> results = new ArrayList<RyaType>(); + System.out.println("before while"); + while(ryaStatementRecordReader.nextKeyValue()) { + System.out.println("in while"); + RyaTypeWritable writable = ryaStatementRecordReader.getCurrentValue(); + RyaType value = writable.getRyaType(); + Object text = ryaStatementRecordReader.getCurrentKey(); + RyaType type = new RyaType(); + type.setData(value.getData()); + type.setDataType(value.getDataType()); + results.add(type); + + System.out.println(value.getData()); + System.out.println(value.getDataType()); + System.out.println(results); + System.out.println(type); + System.out.println(text); + System.out.println(value); + } + System.out.println("after while"); + + System.out.println(results.size()); + System.out.println(results); +// Assert.assertTrue(results.size() == 2); +// Assert.assertTrue(results.contains(input)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5ab29aa..43cf6f5 100644 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,7 @@ under the License. <module>osgi</module> <module>pig</module> <module>sail</module> + <module>spark</module> <module>web</module> </modules> <properties> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/pom.xml ---------------------------------------------------------------------- diff --git a/spark/pom.xml b/spark/pom.xml index 02bdb37..04c8bb5 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -24,11 +24,11 @@ under the License. <parent> <groupId>org.apache.rya</groupId> <artifactId>rya-project</artifactId> - <version>3.2.10-SNAPSHOT</version> + <version>3.2.10-incubating-SNAPSHOT</version> </parent> <artifactId>rya.spark</artifactId> - <name>Apache Rya MapReduce Tools</name> + <name>Apache Rya Spark Support</name> <dependencies> <dependency> @@ -39,7 +39,7 @@ under the License. <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> - <version>1.2.2</version> + <version>1.6.2</version> </dependency> <dependency> <groupId>org.apache.rya</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java b/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java deleted file mode 100644 index f4b7860..0000000 --- a/spark/src/main/java/mvm/rya/accumulo/spark/GraphXGraphGenerator.java +++ /dev/null @@ -1,188 +0,0 @@ -package mvm.rya.accumulo.spark; - -import java.io.IOException; - -import mvm.rya.accumulo.AccumuloRdfConstants; -import mvm.rya.accumulo.mr.GraphXEdgeInputFormat; -import mvm.rya.accumulo.mr.GraphXInputFormat; -import mvm.rya.accumulo.mr.MRUtils; -import mvm.rya.accumulo.mr.RyaInputFormat; -import mvm.rya.accumulo.mr.RyaTypeWritable; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; -import mvm.rya.indexing.accumulo.ConfigUtils; -import mvm.rya.indexing.accumulo.entity.EntityCentricIndex; - -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.ClientConfiguration; -import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Job; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.graphx.Edge; -import org.apache.spark.graphx.Graph; -import org.apache.spark.rdd.RDD; -import org.apache.spark.storage.StorageLevel; - -import scala.Tuple2; -import scala.reflect.ClassTag; - -import com.google.common.base.Preconditions; - -@SuppressWarnings({ "unchecked", "rawtypes" }) -public class GraphXGraphGenerator { - - public String zk; - public String instance; - public String userName; - public String pwd; - public boolean mock; - public String tablePrefix; - public Authorizations authorizations; - - public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ - // Load configuration parameters - zk = MRUtils.getACZK(conf); - instance = MRUtils.getACInstance(conf); - userName = MRUtils.getACUserName(conf); - pwd = MRUtils.getACPwd(conf); - mock = MRUtils.getACMock(conf, false); - tablePrefix = MRUtils.getTablePrefix(conf); - // Set authorizations if specified - String authString = conf.get(MRUtils.AC_AUTH_PROP); - if (authString != null && !authString.isEmpty()) { - authorizations = new Authorizations(authString.split(",")); - conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency - } - else { - authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; - } - // Set table prefix to the default if not set - if (tablePrefix == null) { - tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; - MRUtils.setTablePrefix(conf, tablePrefix); - } - // Check for required configuration parameters - Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set."); - Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set."); - Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set."); - Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set."); - RdfCloudTripleStoreConstants.prefixTables(tablePrefix); - // If connecting to real accumulo, set additional parameters and require zookeepers - if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency - // Ensure consistency between alternative configuration properties - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); - conf.set(ConfigUtils.CLOUDBASE_USER, userName); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); - conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); - conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix); - - Job job = Job.getInstance(conf, sc.appName()); - - ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk); - - //maybe ask conf for correct suffix? - GraphXInputFormat.setInputTableName(job, EntityCentricIndex.CONF_TABLE_SUFFIX); - GraphXInputFormat.setConnectorInfo(job, userName, pwd); - GraphXInputFormat.setZooKeeperInstance(job, clientConfig); - GraphXInputFormat.setScanAuthorizations(job, authorizations); - - return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXInputFormat.class, Object.class, RyaTypeWritable.class); - } - - public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ - // Load configuration parameters - zk = MRUtils.getACZK(conf); - instance = MRUtils.getACInstance(conf); - userName = MRUtils.getACUserName(conf); - pwd = MRUtils.getACPwd(conf); - mock = MRUtils.getACMock(conf, false); - tablePrefix = MRUtils.getTablePrefix(conf); - // Set authorizations if specified - String authString = conf.get(MRUtils.AC_AUTH_PROP); - if (authString != null && !authString.isEmpty()) { - authorizations = new Authorizations(authString.split(",")); - conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency - } - else { - authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; - } - // Set table prefix to the default if not set - if (tablePrefix == null) { - tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; - MRUtils.setTablePrefix(conf, tablePrefix); - } - // Check for required configuration parameters - Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set."); - Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set."); - Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set."); - Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set."); - RdfCloudTripleStoreConstants.prefixTables(tablePrefix); - // If connecting to real accumulo, set additional parameters and require zookeepers - if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency - // Ensure consistency between alternative configuration properties - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); - conf.set(ConfigUtils.CLOUDBASE_USER, userName); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); - conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); - conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix); - - Job job = Job.getInstance(conf, sc.appName()); - - ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk); - - RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO); - RyaInputFormat.setConnectorInfo(job, userName, pwd); - RyaInputFormat.setZooKeeperInstance(job, clientConfig); - RyaInputFormat.setScanAuthorizations(job, authorizations); - return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXEdgeInputFormat.class, Object.class, Edge.class); - } - - public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ - StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY(); - StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY(); - ClassTag<RyaTypeWritable> RTWTag = null; - RyaTypeWritable rtw = null; - RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = getVertexRDD(sc, conf); - - RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf); - JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD(); - JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> tuple._2); - - RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd); - - return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, storageLvl2, RTWTag, RTWTag); - } - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3f27536a/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java b/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java new file mode 100644 index 0000000..b1889b8 --- /dev/null +++ b/spark/src/main/java/org/apache/rya/accumulo/spark/GraphXGraphGenerator.java @@ -0,0 +1,183 @@ +package org.apache.rya.accumulo.spark; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import org.apache.rya.accumulo.AccumuloRdfConstants; +import org.apache.rya.accumulo.mr.GraphXEdgeInputFormat; +import org.apache.rya.accumulo.mr.GraphXInputFormat; +import org.apache.rya.accumulo.mr.MRUtils; +import org.apache.rya.accumulo.mr.RyaInputFormat; +import org.apache.rya.accumulo.mr.RyaTypeWritable; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.RdfCloudTripleStoreConstants; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.RdfCloudTripleStoreUtils; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.graphx.Edge; +import org.apache.spark.graphx.Graph; +import org.apache.spark.rdd.RDD; +import org.apache.spark.storage.StorageLevel; + +import scala.Tuple2; +import scala.reflect.ClassTag; +import scala.reflect.ClassTag$; + +import com.google.common.base.Preconditions; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class GraphXGraphGenerator { + + public String zk; + public String instance; + public String userName; + public String pwd; + public boolean mock; + public String tablePrefix; + public Authorizations authorizations; + + public RDD<Tuple2<Object, RyaTypeWritable>> getVertexRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ + // Load configuration parameters + zk = MRUtils.getACZK(conf); + instance = MRUtils.getACInstance(conf); + userName = MRUtils.getACUserName(conf); + pwd = MRUtils.getACPwd(conf); + mock = MRUtils.getACMock(conf, false); + tablePrefix = MRUtils.getTablePrefix(conf); + // Set authorizations if specified + String authString = conf.get(MRUtils.AC_AUTH_PROP); + if (authString != null && !authString.isEmpty()) { + authorizations = new Authorizations(authString.split(",")); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency + } + else { + authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; + } + // Set table prefix to the default if not set + if (tablePrefix == null) { + tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; + MRUtils.setTablePrefix(conf, tablePrefix); + } + // Check for required configuration parameters + Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set."); + Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set."); + Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set."); + Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set."); + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + // If connecting to real accumulo, set additional parameters and require zookeepers + if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency + // Ensure consistency between alternative configuration properties + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + conf.set(ConfigUtils.CLOUDBASE_USER, userName); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix); + + Job job = Job.getInstance(conf, sc.appName()); + + ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk); + + GraphXInputFormat.setInputTableName(job, EntityCentricIndex.getTableName(conf)); + GraphXInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + GraphXInputFormat.setZooKeeperInstance(job, clientConfig); + GraphXInputFormat.setScanAuthorizations(job, authorizations); + + return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXInputFormat.class, Object.class, RyaTypeWritable.class); + } + + public RDD<Tuple2<Object, Edge>> getEdgeRDD(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ + // Load configuration parameters + zk = MRUtils.getACZK(conf); + instance = MRUtils.getACInstance(conf); + userName = MRUtils.getACUserName(conf); + pwd = MRUtils.getACPwd(conf); + mock = MRUtils.getACMock(conf, false); + tablePrefix = MRUtils.getTablePrefix(conf); + // Set authorizations if specified + String authString = conf.get(MRUtils.AC_AUTH_PROP); + if (authString != null && !authString.isEmpty()) { + authorizations = new Authorizations(authString.split(",")); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency + } + else { + authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; + } + // Set table prefix to the default if not set + if (tablePrefix == null) { + tablePrefix = RdfCloudTripleStoreConstants.TBL_PRFX_DEF; + MRUtils.setTablePrefix(conf, tablePrefix); + } + // Check for required configuration parameters + Preconditions.checkNotNull(instance, "Accumulo instance name [" + MRUtils.AC_INSTANCE_PROP + "] not set."); + Preconditions.checkNotNull(userName, "Accumulo username [" + MRUtils.AC_USERNAME_PROP + "] not set."); + Preconditions.checkNotNull(pwd, "Accumulo password [" + MRUtils.AC_PWD_PROP + "] not set."); + Preconditions.checkNotNull(tablePrefix, "Table prefix [" + MRUtils.TABLE_PREFIX_PROPERTY + "] not set."); + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + // If connecting to real accumulo, set additional parameters and require zookeepers + if (!mock) conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zk); // for consistency + // Ensure consistency between alternative configuration properties + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instance); + conf.set(ConfigUtils.CLOUDBASE_USER, userName); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, pwd); + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, mock); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, tablePrefix); + + Job job = Job.getInstance(conf, sc.appName()); + + ClientConfiguration clientConfig = new ClientConfiguration().with(ClientProperty.INSTANCE_NAME, instance).with(ClientProperty.INSTANCE_ZK_HOST, zk); + + RyaInputFormat.setTableLayout(job, TABLE_LAYOUT.SPO); + RyaInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + RyaInputFormat.setZooKeeperInstance(job, clientConfig); + RyaInputFormat.setScanAuthorizations(job, authorizations); + String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(TABLE_LAYOUT.SPO, tablePrefix); + InputFormatBase.setInputTableName(job, tableName); + return sc.newAPIHadoopRDD(job.getConfiguration(), GraphXEdgeInputFormat.class, Object.class, Edge.class); + } + + public Graph<RyaTypeWritable, RyaTypeWritable> createGraph(SparkContext sc, Configuration conf) throws IOException, AccumuloSecurityException{ + StorageLevel storageLvl1 = StorageLevel.MEMORY_ONLY(); + StorageLevel storageLvl2 = StorageLevel.MEMORY_ONLY(); + ClassTag<RyaTypeWritable> RTWTag = ClassTag$.MODULE$.apply(RyaTypeWritable.class); + RyaTypeWritable rtw = null; + RDD<Tuple2<Object, RyaTypeWritable>> vertexRDD = getVertexRDD(sc, conf); + + RDD<Tuple2<Object, Edge>> edgeRDD = getEdgeRDD(sc, conf); + JavaRDD<Tuple2<Object, Edge>> jrddTuple = edgeRDD.toJavaRDD(); + JavaRDD<Edge<RyaTypeWritable>> jrdd = jrddTuple.map(tuple -> tuple._2); + + RDD<Edge<RyaTypeWritable>> goodERDD = JavaRDD.toRDD(jrdd); + + return Graph.apply(vertexRDD, goodERDD, rtw, storageLvl1, storageLvl2, RTWTag, RTWTag); + } +}
