This is an automated email from the ASF dual-hosted git repository. dionysios pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/giraph.git
The following commit(s) were added to refs/heads/trunk by this push: new 2cae34a GIRAPH-1238 2cae34a is described below commit 2cae34a3cfd95329b21e0537ddb30afb5b750209 Author: Dionysios Logothetis <dlogothe...@gmail.com> AuthorDate: Fri May 8 15:26:20 2020 -0700 GIRAPH-1238 closes #124 --- giraph-accumulo/pom.xml | 193 --------------------- giraph-accumulo/src/main/assembly/compile.xml | 39 ----- .../io/accumulo/AccumuloVertexInputFormat.java | 169 ------------------ .../io/accumulo/AccumuloVertexOutputFormat.java | 174 ------------------- .../apache/giraph/io/accumulo/package-info.java | 21 --- .../io/accumulo/TestAccumuloVertexFormat.java | 187 -------------------- .../edgemarker/AccumuloEdgeInputFormat.java | 96 ---------- .../edgemarker/AccumuloEdgeOutputFormat.java | 76 -------- giraph-dist/pom.xml | 4 + pom.xml | 31 ++-- 10 files changed, 14 insertions(+), 976 deletions(-) diff --git a/giraph-accumulo/pom.xml b/giraph-accumulo/pom.xml deleted file mode 100644 index ba9a110..0000000 --- a/giraph-accumulo/pom.xml +++ /dev/null @@ -1,193 +0,0 @@ -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.giraph</groupId> - <artifactId>giraph-parent</artifactId> - <version>1.3.0-SNAPSHOT</version> - </parent> - <artifactId>giraph-accumulo</artifactId> - <packaging>jar</packaging> - - <name>Apache Giraph Accumulo I/O</name> - <url>http://giraph.apache.org/giraph-accumulo/</url> - <description>Giraph Accumulo input/output classes</description> - - <properties> - <top.dir>${project.basedir}/..</top.dir> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-site-plugin</artifactId> - <configuration> - <siteDirectory>${project.basedir}/src/site</siteDirectory> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.6</version> - <configuration> - <skip>${surefire.skip}</skip> - <systemProperties> - <property> - <name>prop.jarLocation</name> - <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value> - </property> - </systemProperties> - </configuration> - </plugin> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>findbugs-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - - <profiles> - <profile> - <id>hadoop_0.20.203</id> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-test</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - </profile> - - <profile> - <id>hadoop_1</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-test</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - </profile> - - <profile> - <id>hadoop_2</id> - <properties> - <surefire.skip>true</surefire.skip> - </properties> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - </profile> - - <profile> - <id>hadoop_non_secure</id> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-test</artifactId> - <version>${hadoop.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - </profile> - - <profile> - <id>hadoop_facebook</id> - <dependencies> - <dependency> - <groupId>com.facebook.hadoop</groupId> - <artifactId>hadoop-test</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.jboss.netty</groupId> - <artifactId>netty</artifactId> - <version>${dep.oldnetty.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - </profile> - </profiles> - - <dependencies> - <!-- compile dependencies. sorted lexicographically. --> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </dependency> - <dependency> - <groupId>org.apache.giraph</groupId> - <artifactId>giraph-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.giraph</groupId> - <artifactId>giraph-core</artifactId> - <type>test-jar</type> - </dependency> - - <!-- provided dependencies. sorted lexicographically. --> - <dependency> - <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-core</artifactId> - <scope>provided</scope> - </dependency> - - <!-- test dependencies. sorted lexicographically. --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <scope>test</scope> - </dependency> - </dependencies> -</project> diff --git a/giraph-accumulo/src/main/assembly/compile.xml b/giraph-accumulo/src/main/assembly/compile.xml deleted file mode 100644 index 6acf679..0000000 --- a/giraph-accumulo/src/main/assembly/compile.xml +++ /dev/null @@ -1,39 +0,0 @@ -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> - <id>jar-with-dependencies</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - - <dependencySets> - <dependencySet> - <useProjectArtifact>true</useProjectArtifact> - <outputDirectory>/</outputDirectory> - <unpackOptions> - <excludes> - <exclude>META-INF/LICENSE</exclude> - </excludes> - </unpackOptions> - <unpack>true</unpack> - <scope>runtime</scope> - </dependencySet> - </dependencySets> -</assembly> \ No newline at end of file diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java deleted file mode 100644 index c286ed4..0000000 --- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexInputFormat.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.io.accumulo; - -import java.io.IOException; -import java.util.List; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; -import org.apache.giraph.io.VertexInputFormat; -import org.apache.giraph.io.VertexReader; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/** - * Class which wraps the AccumuloInputFormat. It's designed - * as an extension point to VertexInputFormat subclasses who wish - * to read from AccumuloTables. - * - * Works with - * {@link AccumuloVertexOutputFormat} - * - * @param <I> vertex id type - * @param <V> vertex value type - * @param <E> edge type - */ -public abstract class AccumuloVertexInputFormat< - I extends WritableComparable, - V extends Writable, - E extends Writable> - extends VertexInputFormat<I, V, E> { - /** - * delegate input format for all accumulo operations. - */ - protected AccumuloInputFormat accumuloInputFormat = - new AccumuloInputFormat(); - - /** - * Abstract class which provides a template for instantiating vertices - * from Accumulo Key/Value pairs. - * - * @param <I> vertex id type - * @param <V> vertex value type - * @param <E> edge type - */ - public abstract static class AccumuloVertexReader< - I extends WritableComparable, - V extends Writable, E extends Writable> - extends VertexReader<I, V, E> { - - /** Giraph configuration */ - private ImmutableClassesGiraphConfiguration<I, V, E> - configuration; - /** - * Used by subclasses to read key/value pairs. - */ - private final RecordReader<Key, Value> reader; - /** Context passed to initialize */ - private TaskAttemptContext context; - - /** - * Constructor used to pass Record Reader instance - * @param reader Accumulo record reader - */ - public AccumuloVertexReader(RecordReader<Key, Value> reader) { - this.reader = reader; - } - - public ImmutableClassesGiraphConfiguration<I, V, E> - getConfiguration() { - return configuration; - } - - /** - * initialize the reader. - * - * @param inputSplit Input split to be used for reading vertices. - * @param context Context from the task. - * @throws IOException - * @throws InterruptedException - */ - public void initialize(InputSplit inputSplit, - TaskAttemptContext context) - throws IOException, InterruptedException { - reader.initialize(inputSplit, context); - this.context = context; - this.configuration = - new ImmutableClassesGiraphConfiguration<I, V, E>( - context.getConfiguration()); - } - - /** - * close - * - * @throws IOException - */ - public void close() throws IOException { - reader.close(); - } - - /** - * getProgress - * - * @return progress - * @throws IOException - * @throws InterruptedException - */ - public float getProgress() throws IOException, InterruptedException { - return reader.getProgress(); - } - - /** - * Get the result record reader - * - * @return Record reader to be used for reading. - */ - protected RecordReader<Key, Value> getRecordReader() { - return reader; - } - - /** - * getContext - * - * @return Context passed to initialize. - */ - protected TaskAttemptContext getContext() { - return context; - } - - } - - @Override - public List<InputSplit> getSplits( - JobContext context, int minSplitCountHint) - throws IOException, InterruptedException { - List<InputSplit> splits = null; - try { - splits = accumuloInputFormat.getSplits(context); - } catch (IOException e) { - if (e.getMessage().contains("Input info has not been set")) { - throw new IOException(e.getMessage() + - " Make sure you initialized" + - " AccumuloInputFormat static setters " + - "before passing the config to GiraphJob."); - } - } - return splits; - } -} diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java deleted file mode 100644 index 1927ed7..0000000 --- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/AccumuloVertexOutputFormat.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.io.accumulo; - -import java.io.IOException; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.data.Mutation; -import org.apache.giraph.io.VertexOutputFormat; -import org.apache.giraph.io.VertexWriter; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -/** - * - * Class which wraps the AccumuloOutputFormat. It's designed - * as an extension point to VertexOutputFormat subclasses who wish - * to write vertices back to an Accumulo table. - * - * Works with - * {@link AccumuloVertexInputFormat} - * - * - * @param <I> vertex id type - * @param <V> vertex value type - * @param <E> edge type - */ -public abstract class AccumuloVertexOutputFormat< - I extends WritableComparable, - V extends Writable, - E extends Writable> - extends VertexOutputFormat<I, V, E> { - - - /** - * Output table parameter - */ - public static final String OUTPUT_TABLE = "OUTPUT_TABLE"; - - /** - * Accumulo delegate for table output - */ - protected AccumuloOutputFormat accumuloOutputFormat = - new AccumuloOutputFormat(); - - /** - * - * Main abstraction point for vertex writers to persist back - * to Accumulo tables. - * - * @param <I> vertex id type - * @param <V> vertex value type - * @param <E> edge type - */ - public abstract static class AccumuloVertexWriter< - I extends WritableComparable, - V extends Writable, - E extends Writable> - extends VertexWriter<I, V, E> { - - /** - * task attempt context. - */ - private TaskAttemptContext context; - - /** - * Accumulo record writer - */ - private RecordWriter<Text, Mutation> recordWriter; - - /** - * Constructor for use with subclasses - * - * @param recordWriter accumulo record writer - */ - public AccumuloVertexWriter(RecordWriter<Text, Mutation> recordWriter) { - this.recordWriter = recordWriter; - } - - /** - * initialize - * - * @param context Context used to write the vertices. - * @throws IOException - */ - public void initialize(TaskAttemptContext context) throws IOException { - this.context = context; - } - - /** - * close - * - * @param context the context of the task - * @throws IOException - * @throws InterruptedException - */ - public void close(TaskAttemptContext context) - throws IOException, InterruptedException { - recordWriter.close(context); - } - - /** - * Get the table record writer; - * - * @return Record writer to be used for writing. - */ - public RecordWriter<Text, Mutation> getRecordWriter() { - return recordWriter; - } - - /** - * Get the context. - * - * @return Context passed to initialize. - */ - public TaskAttemptContext getContext() { - return context; - } - - } - /** - * - * checkOutputSpecs - * - * @param context information about the job - * @throws IOException - * @throws InterruptedException - */ - @Override - public void checkOutputSpecs(JobContext context) - throws IOException, InterruptedException { - try { - accumuloOutputFormat.checkOutputSpecs(context); - } catch (IOException e) { - if (e.getMessage().contains("Output info has not been set")) { - throw new IOException(e.getMessage() + " Make sure you initialized" + - " AccumuloOutputFormat static setters " + - "before passing the config to GiraphJob."); - } - } - } - - /** - * getOutputCommitter - * - * @param context the task context - * @return OutputCommitter - * @throws IOException - * @throws InterruptedException - */ - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) - throws IOException, InterruptedException { - return accumuloOutputFormat.getOutputCommitter(context); - } -} diff --git a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/package-info.java b/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/package-info.java deleted file mode 100644 index e75d292..0000000 --- a/giraph-accumulo/src/main/java/org/apache/giraph/io/accumulo/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Accumulo Input/Output for Giraph - */ -package org.apache.giraph.io.accumulo; diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java deleted file mode 100644 index 0ee9666..0000000 --- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/TestAccumuloVertexFormat.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.giraph.io.accumulo; - -import org.apache.accumulo.core.client.BatchWriter; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Scanner; -import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; -import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; -import org.apache.accumulo.core.client.mock.MockInstance; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.core.util.ByteBufferUtil; -import org.apache.accumulo.core.util.Pair; -import org.apache.giraph.BspCase; -import org.apache.giraph.graph.BasicComputation; -import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeInputFormat; -import org.apache.giraph.io.accumulo.edgemarker.AccumuloEdgeOutputFormat; -import org.apache.giraph.job.GiraphJob; -import org.apache.giraph.graph.Vertex; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.HashSet; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/* - Test class for Accumulo vertex input/output formats. - */ -public class TestAccumuloVertexFormat extends BspCase{ - - private final String TABLE_NAME = "simple_graph"; - private final String INSTANCE_NAME = "instance"; - private final Text FAMILY = new Text("cf"); - private final Text CHILDREN = new Text("children"); - private final String USER = "root"; - private final byte[] PASSWORD = new byte[] {}; - private final Text OUTPUT_FIELD = new Text("parent"); - - - private final Logger log = Logger.getLogger(TestAccumuloVertexFormat.class); - - /** - * Create the test case - */ - public TestAccumuloVertexFormat() { - super(TestAccumuloVertexFormat.class.getName()); - } - - /* - Write a simple parent-child directed graph to Accumulo. - Run a job which reads the values - into subclasses that extend AccumuloVertex I/O formats. - Check the output after the job. - */ - @Test - public void testAccumuloInputOutput() throws Exception { - if (System.getProperty("prop.mapred.job.tracker") != null) { - if(log.isInfoEnabled()) - log.info("testAccumuloInputOutput: " + - "Ignore this test if not local mode."); - return; - } - - File jarTest = new File(System.getProperty("prop.jarLocation")); - if(!jarTest.exists()) { - fail("Could not find Giraph jar at " + - "location specified by 'prop.jarLocation'. " + - "Make sure you built the main Giraph artifact?."); - } - - //Write out vertices and edges out to a mock instance. - MockInstance mockInstance = new MockInstance(INSTANCE_NAME); - Connector c = mockInstance.getConnector("root", new byte[] {}); - c.tableOperations().create(TABLE_NAME); - BatchWriter bw = c.createBatchWriter(TABLE_NAME, 10000L, 1000L, 4); - - Mutation m1 = new Mutation(new Text("0001")); - m1.put(FAMILY, CHILDREN, new Value("0002".getBytes())); - bw.addMutation(m1); - - Mutation m2 = new Mutation(new Text("0002")); - m2.put(FAMILY, CHILDREN, new Value("0003".getBytes())); - bw.addMutation(m2); - if(log.isInfoEnabled()) - log.info("Writing mutations to Accumulo table"); - bw.close(); - - Configuration conf = new Configuration(); - conf.set(AccumuloVertexOutputFormat.OUTPUT_TABLE, TABLE_NAME); - - /* - Very important to initialize the formats before - sending configuration to the GiraphJob. Otherwise - the internally constructed Job in GiraphJob will - not have the proper context initialization. - */ - AccumuloInputFormat.setInputInfo(conf, USER, "".getBytes(), - TABLE_NAME, new Authorizations()); - AccumuloInputFormat.setMockInstance(conf, INSTANCE_NAME); - - AccumuloOutputFormat.setOutputInfo(conf, USER, PASSWORD, true, null); - AccumuloOutputFormat.setMockInstance(conf, INSTANCE_NAME); - - GiraphJob job = new GiraphJob(conf, getCallingMethodName()); - setupConfiguration(job); - GiraphConfiguration giraphConf = job.getConfiguration(); - giraphConf.setComputationClass(EdgeNotification.class); - giraphConf.setVertexInputFormatClass(AccumuloEdgeInputFormat.class); - giraphConf.setVertexOutputFormatClass(AccumuloEdgeOutputFormat.class); - - HashSet<Pair<Text, Text>> columnsToFetch = new HashSet<Pair<Text,Text>>(); - columnsToFetch.add(new Pair<Text, Text>(FAMILY, CHILDREN)); - AccumuloInputFormat.fetchColumns(job.getConfiguration(), columnsToFetch); - - if(log.isInfoEnabled()) - log.info("Running edge notification job using Accumulo input"); - assertTrue(job.run(true)); - Scanner scanner = c.createScanner(TABLE_NAME, new Authorizations()); - scanner.setRange(new Range("0002", "0002")); - scanner.fetchColumn(FAMILY, OUTPUT_FIELD); - boolean foundColumn = false; - - if(log.isInfoEnabled()) - log.info("Verify job output persisted correctly."); - //make sure we found the qualifier. - assertTrue(scanner.iterator().hasNext()); - - - //now we check to make sure the expected value from the job persisted correctly. - for(Map.Entry<Key,Value> entry : scanner) { - Text row = entry.getKey().getRow(); - assertEquals("0002", row.toString()); - Value value = entry.getValue(); - assertEquals("0001", ByteBufferUtil.toString( - ByteBuffer.wrap(value.get()))); - foundColumn = true; - } - } - /* - Test compute method that sends each edge a notification of its parents. - The test set only has a 1-1 parent-to-child ratio for this unit test. - */ - public static class EdgeNotification - extends BasicComputation<Text, Text, Text, Text> { - @Override - public void compute(Vertex<Text, Text, Text> vertex, - Iterable<Text> messages) throws IOException { - for (Text message : messages) { - vertex.getValue().set(message); - } - if(getSuperstep() == 0) { - sendMessageToAllEdges(vertex, vertex.getId()); - } - vertex.voteToHalt(); - } - } -} diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java deleted file mode 100644 index ff79d7a..0000000 --- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeInputFormat.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.io.accumulo.edgemarker; - -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.Value; -import org.apache.giraph.edge.Edge; -import org.apache.giraph.edge.EdgeFactory; -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.io.VertexReader; -import org.apache.giraph.io.accumulo.AccumuloVertexInputFormat; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import com.google.common.collect.Lists; - -import java.io.IOException; -import java.util.List; -import java.util.regex.Pattern; - -/* - Example subclass which reads in Key/Value pairs to construct vertex objects. - */ -public class AccumuloEdgeInputFormat - extends AccumuloVertexInputFormat<Text, Text, Text> { - @Override public void checkInputSpecs(Configuration conf) { } - - private static final Text uselessEdgeValue = new Text(); - public VertexReader<Text, Text, Text> - createVertexReader(InputSplit split, TaskAttemptContext context) - throws IOException { - try { - - return new AccumuloEdgeVertexReader( - accumuloInputFormat.createRecordReader(split, context)) { - }; - } catch (InterruptedException e) { - throw new IOException(e); - } - - } - /* - Reader takes Key/Value pairs from the underlying input format. - */ - public static class AccumuloEdgeVertexReader - extends AccumuloVertexReader<Text, Text, Text> { - - public static final Pattern commaPattern = Pattern.compile("[,]"); - - public AccumuloEdgeVertexReader(RecordReader<Key, Value> recordReader) { - super(recordReader); - } - - - public boolean nextVertex() throws IOException, InterruptedException { - return getRecordReader().nextKeyValue(); - } - - /* - Each Key/Value contains the information needed to construct the vertices. - */ - public Vertex<Text, Text, Text> getCurrentVertex() - throws IOException, InterruptedException { - Key key = getRecordReader().getCurrentKey(); - Value value = getRecordReader().getCurrentValue(); - Vertex<Text, Text, Text> vertex = - getConfiguration().createVertex(); - Text vertexId = key.getRow(); - List<Edge<Text, Text>> edges = Lists.newLinkedList(); - String edge = new String(value.get()); - Text edgeId = new Text(edge); - edges.add(EdgeFactory.create(edgeId, uselessEdgeValue)); - vertex.initialize(vertexId, new Text(), edges); - - return vertex; - } - } -} diff --git a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java b/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java deleted file mode 100644 index c2ebbe2..0000000 --- a/giraph-accumulo/src/test/java/org/apache/giraph/io/accumulo/edgemarker/AccumuloEdgeOutputFormat.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.io.accumulo.edgemarker; - -import java.io.IOException; -import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.data.Value; -import org.apache.giraph.graph.Vertex; -import org.apache.giraph.io.VertexWriter; -import org.apache.giraph.io.accumulo.AccumuloVertexOutputFormat; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -/* - Example subclass for writing vertices back to Accumulo. - */ -public class AccumuloEdgeOutputFormat - extends AccumuloVertexOutputFormat<Text, Text, Text> { - - public VertexWriter<Text, Text, Text> - createVertexWriter(TaskAttemptContext context) - throws IOException, InterruptedException { - RecordWriter<Text, Mutation> writer = - accumuloOutputFormat.getRecordWriter(context); - String tableName = getConf().get(OUTPUT_TABLE); - if(tableName == null) - throw new IOException("Forgot to set table name " + - "using AccumuloVertexOutputFormat.OUTPUT_TABLE"); - return new AccumuloEdgeVertexWriter(writer, tableName); - } - - /* - Wraps RecordWriter for writing Mutations back to the configured Accumulo Table. - */ - public static class AccumuloEdgeVertexWriter - extends AccumuloVertexWriter<Text, Text, Text> { - - private final Text CF = new Text("cf"); - private final Text PARENT = new Text("parent"); - private Text tableName; - - public AccumuloEdgeVertexWriter( - RecordWriter<Text, Mutation> writer, String tableName) { - super(writer); - this.tableName = new Text(tableName); - } - /* - Write back a mutation that adds a qualifier for 'parent' containing the vertex value - as the cell value. Assume the vertex ID corresponds to a key. - */ - public void writeVertex(Vertex<Text, Text, Text> vertex) - throws IOException, InterruptedException { - RecordWriter<Text, Mutation> writer = getRecordWriter(); - Mutation mt = new Mutation(vertex.getId()); - mt.put(CF, PARENT, new Value( - vertex.getValue().toString().getBytes())); - writer.write(tableName, mt); - } - } -} diff --git a/giraph-dist/pom.xml b/giraph-dist/pom.xml index 10b7a1b..91ff29c 100644 --- a/giraph-dist/pom.xml +++ b/giraph-dist/pom.xml @@ -95,7 +95,11 @@ </dependency> <dependency> <groupId>org.apache.giraph</groupId> +<<<<<<< HEAD <artifactId>giraph-accumulo</artifactId> +======= + <artifactId>giraph-hcatalog</artifactId> +>>>>>>> 8bf0c60fad64676bc6181dac7320df8d3bbedcbf </dependency> <dependency> <groupId>org.apache.giraph</groupId> diff --git a/pom.xml b/pom.xml index 8a92238..4b9f89a 100644 --- a/pom.xml +++ b/pom.xml @@ -312,7 +312,6 @@ under the License. <checkstyle.config.path>${top.dir}/checkstyle.xml</checkstyle.config.path> <dep.avro.version>1.7.6</dep.avro.version> - <dep.accumulo.version>1.4.0</dep.accumulo.version> <dep.asm.version>3.2</dep.asm.version> <dep.airline.version>0.5</dep.airline.version> <dep.base64.version>2.3.8</dep.base64.version> @@ -987,8 +986,12 @@ under the License. <profile> <id>hadoop_1</id> <modules> +<<<<<<< HEAD <module>giraph-accumulo</module> <<<<<<< HEAD +======= + <module>giraph-hcatalog</module> +>>>>>>> 8bf0c60fad64676bc6181dac7320df8d3bbedcbf <module>giraph-gora</module> ======= <module>giraph-hcatalog</module> @@ -1141,8 +1144,11 @@ under the License. <profile> <id>hadoop_2</id> <modules> +<<<<<<< HEAD <module>giraph-accumulo</module> <<<<<<< HEAD +======= +>>>>>>> 8bf0c60fad64676bc6181dac7320df8d3bbedcbf <module>giraph-hbase</module> <<<<<<< HEAD ======= @@ -1525,12 +1531,15 @@ under the License. </dependency> <dependency> <groupId>org.apache.giraph</groupId> +<<<<<<< HEAD <artifactId>giraph-accumulo</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.giraph</groupId> <<<<<<< HEAD +======= +>>>>>>> 8bf0c60fad64676bc6181dac7320df8d3bbedcbf <artifactId>giraph-rexster-io</artifactId> <version>${project.version}</version> </dependency> @@ -1798,26 +1807,6 @@ under the License. <version>${dep.commons-net.version}</version> </dependency> <dependency> - <groupId>org.apache.accumulo</groupId> - <artifactId>accumulo-core</artifactId> - <version>${dep.accumulo.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - <exclusion> - <groupId>commons-logging</groupId> - <artifactId>commons-logging-api</artifactId> - </exclusion> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>avro</artifactId> <version>${dep.avro.version}</version>