initial giraph support; closes #132
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/45efa55b Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/45efa55b Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/45efa55b Branch: refs/heads/master Commit: 45efa55b01e58343d33522937c964d7fe6acfbe9 Parents: ccb5a67 Author: pujav65 <puja...@gmail.com> Authored: Tue Dec 20 16:35:06 2016 -0500 Committer: pujav65 <puja...@gmail.com> Committed: Mon Jan 23 11:20:35 2017 -0500 ---------------------------------------------------------------------- .../apache/rya/sail/config/RyaSailFactory.java | 2 +- extras/pom.xml | 3 +- extras/rya.giraph/pom.xml | 85 ++++++++++++ .../rya/giraph/format/RyaEdgeInputFormat.java | 61 +++++++++ .../apache/rya/giraph/format/RyaEdgeReader.java | 84 ++++++++++++ .../rya/giraph/format/RyaGiraphUtils.java | 78 +++++++++++ .../rya/giraph/format/RyaVertexInputFormat.java | 75 +++++++++++ .../rya/giraph/format/RyaVertexReader.java | 94 +++++++++++++ .../rya/giraph/format/TestTextOutputFormat.java | 44 +++++++ .../rya/giraph/format/TestVertexFormat.java | 132 +++++++++++++++++++ .../apache/rya/accumulo/mr/RyaInputFormat.java | 14 ++ 11 files changed, 670 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java index 60ab615..8ce6694 100644 --- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java +++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java @@ -133,7 +133,7 @@ public class RyaSailFactory { return dao; } - private static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException { + public static AccumuloRyaDAO getAccumuloDAO(final AccumuloRdfConfiguration config) throws AccumuloException, AccumuloSecurityException, RyaDAOException { final Connector connector = ConfigUtils.getConnector(config); final AccumuloRyaDAO dao = new AccumuloRyaDAO(); dao.setConnector(connector); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/pom.xml ---------------------------------------------------------------------- diff --git a/extras/pom.xml b/extras/pom.xml index 2ac129a..6acb51f 100644 --- a/extras/pom.xml +++ b/extras/pom.xml @@ -42,7 +42,8 @@ under the License. <module>rya.pcj.fluo</module> <module>rya.export</module> <module>rya.merger</module> - <module>rya.benchmark</module> + <module>rya.giraph</module> + <module>rya.benchmark</module> </modules> <profiles> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.giraph/pom.xml b/extras/rya.giraph/pom.xml new file mode 100644 index 0000000..2615b34 --- /dev/null +++ b/extras/rya.giraph/pom.xml @@ -0,0 +1,85 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.extras</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>rya.giraph</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.giraph</groupId> + <artifactId>giraph-core</artifactId> + <version>1.2.0</version> + </dependency> + <dependency> + <groupId>org.apache.giraph</groupId> + <artifactId>giraph-accumulo</artifactId> + <version>1.2.0</version> + </dependency> +<dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <version>2.5.1</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>2.5.1</version> + <scope>provided</scope> +</dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>accumulo.rya</artifactId> + <exclusions> + <exclusion> + <artifactId> + hadoop-mapreduce-client-jobclient + </artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId> + hadoop-mapreduce-client-core + </artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.mapreduce</artifactId> + <exclusions> + <exclusion> + <artifactId>spark-graphx_2.11</artifactId> + <groupId>org.apache.spark</groupId> + </exclusion> + + </exclusions> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java ---------------------------------------------------------------------- diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java new file mode 100644 index 0000000..761e409 --- /dev/null +++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeInputFormat.java @@ -0,0 +1,61 @@ +package org.apache.rya.giraph.format; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.List; + +import org.apache.giraph.io.EdgeInputFormat; +import org.apache.giraph.io.EdgeReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.rya.accumulo.mr.RyaInputFormat; +import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader; +import org.apache.rya.accumulo.mr.RyaStatementWritable; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.resolver.RyaTripleContext; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class RyaEdgeInputFormat extends EdgeInputFormat<Text, RyaStatementWritable> { + + private RyaInputFormat ryaInputFormat = new RyaInputFormat(); + private TABLE_LAYOUT rdfTableLayout; + private RyaTripleContext tripleContext; + + @Override + public EdgeReader<Text, RyaStatementWritable> createEdgeReader(InputSplit split, TaskAttemptContext context) throws IOException { + return new RyaEdgeReader((RyaStatementRecordReader) ryaInputFormat.createRecordReader(split, context), + rdfTableLayout, tripleContext, + context.getConfiguration()); + } + + @Override + public void checkInputSpecs(Configuration arg0) { + // nothing to do + + } + + @Override + public List<InputSplit> getSplits(JobContext context, int arg1) throws IOException, InterruptedException { + return ryaInputFormat.getSplits(context); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java ---------------------------------------------------------------------- diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java new file mode 100644 index 0000000..f0f9017 --- /dev/null +++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaEdgeReader.java @@ -0,0 +1,84 @@ +package org.apache.rya.giraph.format; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.io.EdgeReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader; +import org.apache.rya.accumulo.mr.RyaStatementWritable; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaTripleContext; + +public class RyaEdgeReader extends EdgeReader<Text, RyaStatementWritable>{ + + private RyaStatementRecordReader reader; + private RyaTripleContext tripleContext; + private TABLE_LAYOUT tableLayout; + + public RyaEdgeReader(RyaStatementRecordReader recordReader, + TABLE_LAYOUT rdfTableLayout, RyaTripleContext tripleContext, Configuration conf){ + this.reader = recordReader; + this.tableLayout = rdfTableLayout; + this.tripleContext = tripleContext; + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { + reader.initialize(inputSplit, context, tripleContext, tableLayout); + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return reader.getProgress(); + } + + @Override + public boolean nextEdge() throws IOException, InterruptedException { + return reader.nextKeyValue(); + } + + @Override + public Text getCurrentSourceId() throws IOException, InterruptedException { + RyaStatementWritable currentStatement = reader.getCurrentValue(); + return new Text(currentStatement.getRyaStatement().getSubject().getData()); + } + + @Override + public Edge<Text, RyaStatementWritable> getCurrentEdge() throws IOException, InterruptedException { + RyaStatementWritable currentStatement = reader.getCurrentValue(); + RyaStatement ryaStatement = currentStatement.getRyaStatement(); + Edge<Text, RyaStatementWritable> edge = EdgeFactory.create(new Text(ryaStatement.toString()), + currentStatement); + return edge; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java new file mode 100644 index 0000000..37b44d7 --- /dev/null +++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaGiraphUtils.java @@ -0,0 +1,78 @@ +package org.apache.rya.giraph.format; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; + +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.rya.accumulo.AccumuloRdfConstants; +import org.apache.rya.accumulo.mr.MRUtils; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.RdfCloudTripleStoreUtils; +import org.apache.rya.indexing.accumulo.ConfigUtils; + +public class RyaGiraphUtils { + + public static void initializeAccumuloInputFormat(Configuration conf){ + // get accumulo connect information + boolean mock = MRUtils.getACMock(conf, false); + String zk = MRUtils.getACZK(conf); + String instance = MRUtils.getACInstance(conf); + String userName = MRUtils.getACUserName(conf); + String pwd = MRUtils.getACPwd(conf); + String tablePrefix = MRUtils.getTablePrefix(conf); + TABLE_LAYOUT rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.SPO); + String authString = conf.get(MRUtils.AC_AUTH_PROP); + Authorizations authorizations; + if (authString != null && !authString.isEmpty()) { + authorizations = new Authorizations(authString.split(",")); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, authString); // for consistency + } + else { + authorizations = AccumuloRdfConstants.ALL_AUTHORIZATIONS; + } + + + // set up the accumulo input format so that we know what table to use and everything + try { + Job job = new Job(conf); + AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix); + AccumuloInputFormat.setInputTableName(job, tableName); + AccumuloInputFormat.setScanAuthorizations(job, authorizations); + if (mock) { + AccumuloInputFormat.setMockInstance(job, instance); + } else { + ClientConfiguration clientConfig = ClientConfiguration.loadDefault() + .withInstance(instance).withZkHosts(zk); + AccumuloInputFormat.setZooKeeperInstance(job, clientConfig); + } + } catch (IOException | AccumuloSecurityException e) { + // TODO better exception handling here + e.printStackTrace(); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java ---------------------------------------------------------------------- diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java new file mode 100644 index 0000000..ecd2c13 --- /dev/null +++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexInputFormat.java @@ -0,0 +1,75 @@ +package org.apache.rya.giraph.format; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.List; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.io.VertexReader; +import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.mr.MRUtils; +import org.apache.rya.accumulo.mr.RyaInputFormat; +import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader; +import org.apache.rya.accumulo.mr.RyaStatementWritable; +import org.apache.rya.accumulo.mr.RyaTypeWritable; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.resolver.RyaTripleContext; + +public class RyaVertexInputFormat extends AccumuloVertexInputFormat<Text, RyaTypeWritable, RyaStatementWritable> { + + + private RyaInputFormat ryaInputFormat = new RyaInputFormat(); + private TABLE_LAYOUT rdfTableLayout; + private RyaTripleContext tripleContext; + + @Override + public VertexReader<Text, RyaTypeWritable, RyaStatementWritable> createVertexReader(InputSplit split, TaskAttemptContext context) throws IOException { + return new RyaVertexReader((RyaStatementRecordReader) ryaInputFormat.createRecordReader(split, context), + rdfTableLayout, tripleContext, + context.getConfiguration()); + } + + @Override + public void checkInputSpecs(Configuration conf) { + // don't need to do anything here + } + + @Override + public List<InputSplit> getSplits(JobContext context, int minSplitCountHint) throws IOException, InterruptedException { + return ryaInputFormat.getSplits(context); + } + + @Override + public void setConf(ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable> conf) { + super.setConf(conf); + RyaGiraphUtils.initializeAccumuloInputFormat(conf); + rdfTableLayout = MRUtils.getTableLayout(conf, TABLE_LAYOUT.SPO); + tripleContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(conf)); + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java ---------------------------------------------------------------------- diff --git a/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java new file mode 100644 index 0000000..dbb404d --- /dev/null +++ b/extras/rya.giraph/src/main/java/org/apache/rya/giraph/format/RyaVertexReader.java @@ -0,0 +1,94 @@ +package org.apache.rya.giraph.format; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.VertexReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.rya.accumulo.mr.RyaInputFormat.RyaStatementRecordReader; +import org.apache.rya.accumulo.mr.RyaStatementWritable; +import org.apache.rya.accumulo.mr.RyaTypeWritable; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaTripleContext; + +public class RyaVertexReader extends VertexReader<Text, RyaTypeWritable, RyaStatementWritable>{ + + private RyaStatementRecordReader reader; + private RyaTripleContext tripleContext; + private TABLE_LAYOUT tableLayout; + private ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable> classesConfiguration; + + public RyaVertexReader(RyaStatementRecordReader recordReader, + TABLE_LAYOUT rdfTableLayout, RyaTripleContext tripleContext, Configuration conf){ + this.reader = recordReader; + this.tableLayout = rdfTableLayout; + this.tripleContext = tripleContext; + this.classesConfiguration = + new ImmutableClassesGiraphConfiguration<Text, RyaTypeWritable, RyaStatementWritable>(conf); + } + + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { + reader.initialize(inputSplit, context, tripleContext, tableLayout); + } + + @Override + public boolean nextVertex() throws IOException, InterruptedException { + return reader.nextKeyValue(); + } + + @Override + public Vertex<Text, RyaTypeWritable, RyaStatementWritable> getCurrentVertex() throws IOException, InterruptedException { + RyaStatementWritable currentStatement = reader.getCurrentValue(); + RyaStatement ryaStatement = currentStatement.getRyaStatement(); + RyaTypeWritable vertexWritable = new RyaTypeWritable(); + vertexWritable.setRyaType(ryaStatement.getSubject()); + Text vertexId = new Text(ryaStatement.getSubject().getData()); + Vertex<Text, RyaTypeWritable, RyaStatementWritable> vertex = classesConfiguration.createVertex(); + Edge<Text, RyaStatementWritable> edge = EdgeFactory.create(new Text(ryaStatement.toString()), + currentStatement); + List<Edge<Text, RyaStatementWritable>> edges = new ArrayList<Edge<Text, RyaStatementWritable>>(); + edges.add(edge); + vertex.initialize(vertexId, vertexWritable, edges); + return vertex; + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return reader.getProgress(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java ---------------------------------------------------------------------- diff --git a/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java new file mode 100644 index 0000000..be297db --- /dev/null +++ b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestTextOutputFormat.java @@ -0,0 +1,44 @@ +package org.apache.rya.giraph.format; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.formats.TextVertexOutputFormat; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +public class TestTextOutputFormat extends TextVertexOutputFormat<Text, Text, Text> { + + public class SystemOutVertexWriter extends TextVertexWriter { + + @Override + public void writeVertex(Vertex<Text, Text, Text> vertex) throws IOException, InterruptedException { + System.out.println(vertex); + } + + } + + @Override + public TextVertexOutputFormat<Text, Text, Text>.TextVertexWriter createVertexWriter(TaskAttemptContext context) + throws IOException, InterruptedException { + return new SystemOutVertexWriter(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java ---------------------------------------------------------------------- diff --git a/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java new file mode 100644 index 0000000..920e876 --- /dev/null +++ b/extras/rya.giraph/src/test/java/org/apache/rya/giraph/format/TestVertexFormat.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rya.giraph.format; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.giraph.BspCase; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat; +import org.apache.giraph.job.GiraphJob; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.giraph.format.RyaVertexInputFormat; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.sail.config.RyaSailFactory; +import org.junit.Test; + +/* + Test class for Rya vertex input formats. + */ +public class TestVertexFormat extends BspCase { + + private final Logger log = Logger.getLogger(TestVertexFormat.class); + + /** + * Create the test case + */ + public TestVertexFormat() { + super(TestVertexFormat.class.getName()); + System.setProperty("java.io.tmpdir", "target/test"); +} + + private static AccumuloRdfConfiguration getConf() { + + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + + conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, true); + conf.set(ConfigUtils.USE_PCJ, "false"); + conf.set(ConfigUtils.USE_FREETEXT, "false"); + conf.set(ConfigUtils.USE_TEMPORAL, "false"); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, "rya_"); + conf.set(ConfigUtils.CLOUDBASE_USER, "root"); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ""); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, "test"); + conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); + return conf; + } + + /* + Write a simple parent-child directed graph to Cloudbase. + Run a job which reads the values + into subclasses that extend CloudbaseVertex I/O formats. + Check the output after the job. + */ + @Test + public void testRyaInput() throws Exception { + + AccumuloRdfConfiguration conf = getConf(); + AccumuloRyaDAO ryaDAO = RyaSailFactory.getAccumuloDAO(conf); + + ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"), + new RyaURI("urn:test#pred1"), + new RyaURI("urn:test#obj1"))); + ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"), + new RyaURI("urn:test#pred2"), + new RyaURI("urn:test#obj2"))); + ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"), + new RyaURI("urn:test#pred3"), + new RyaURI("urn:test#obj3"))); + ryaDAO.add(new RyaStatement(new RyaURI("urn:test#1234"), + new RyaURI("urn:test#pred4"), + new RyaURI("urn:test#obj4"))); + ryaDAO.flush(); + + GiraphJob job = new GiraphJob(conf, getCallingMethodName()); + + setupConfiguration(job); + GiraphConfiguration giraphConf = job.getConfiguration(); + giraphConf.setComputationClass(EdgeNotification.class); + giraphConf.setVertexInputFormatClass(RyaVertexInputFormat.class); + giraphConf.setVertexOutputFormatClass(TestTextOutputFormat.class); + + + if (log.isInfoEnabled()) + log.info("Running edge notification job using Rya Vertex input"); + + } + + /* + Test compute method that sends each edge a notification of its parents. + The test set only has a 1-1 parent-to-child ratio for this unit test. + */ + public static class EdgeNotification + extends BasicComputation<Text, Text, Text, Text> { + @Override + public void compute(Vertex<Text, Text, Text> vertex, + Iterable<Text> messages) throws IOException { + for (Text message : messages) { + vertex.getValue().set(message); + } + if(getSuperstep() == 0) { + sendMessageToAllEdges(vertex, vertex.getId()); + } + vertex.voteToHalt(); + } +} +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/45efa55b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java ---------------------------------------------------------------------- diff --git a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java index baa7033..2fc2728 100644 --- a/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java +++ b/mapreduce/src/main/java/org/apache/rya/accumulo/mr/RyaInputFormat.java @@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -95,6 +96,19 @@ public class RyaInputFormat extends AbstractInputFormat<Text, RyaStatementWritab } /** + * Initializes the RecordReader. + * @param inSplit Defines the portion of data to read. + * @param attempt Context for this task attempt. + * @throws IOException if thrown by the superclass's initialize method. + */ + public void initialize(InputSplit inSplit, TaskAttemptContext attempt, RyaTripleContext context, TABLE_LAYOUT tableLayout) throws IOException { + super.initialize(inSplit, attempt); + this.tableLayout = tableLayout; + //TODO verify that this is correct + this.ryaContext = context; + } + + /** * Load the next statement by converting the next Accumulo row to a * statement, and make the new (key,value) pair available for retrieval. * @return true if another (key,value) pair was fetched and is ready to