[GIRAPH-1013] Adding prepare graph library in Java8 Summary: Adding simple graph preparation: - symmetric - removal of isolated edges - normalizing - connected components
Creating a new module, only used for Phadoop_facebook, which is written in Java8. Tests are new/modified from what is in our repo, the rest is identical. Test Plan: mvn clean install Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov Reviewed By: sergey.edunov Differential Revision: https://reviews.facebook.net/D40719 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d7e4bde1 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d7e4bde1 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d7e4bde1 Branch: refs/heads/trunk Commit: d7e4bde11fdbfdef13e9316702c914e749a7757f Parents: f9dc6b5 Author: Igor Kabiljo <[email protected]> Authored: Thu Jun 25 14:13:18 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Thu Jul 2 16:04:27 2015 -0700 ---------------------------------------------------------------------- checkstyle-relaxed-8.xml | 275 +++++++++++ findbugs-exclude.xml | 2 +- giraph-block-app-8/pom.xml | 137 ++++++ .../src/main/assembly/compile.xml | 39 ++ .../block_app/library/ReusableSuppliers.java | 120 +++++ .../giraph/block_app/library/package-info.java | 21 + .../prepare_graph/PrepareGraphPieces.java | 400 ++++++++++++++++ .../UndirectedConnectedComponents.java | 472 +++++++++++++++++++ .../WeaklyConnectedComponents.java | 163 +++++++ .../library/prepare_graph/package-info.java | 22 + .../vertex/ConnectedComponentVertexValue.java | 56 +++ .../WeaklyConnectedComponentVertexValue.java | 57 +++ ...WeaklyConnectedComponentVertexValueImpl.java | 84 ++++ .../prepare_graph/vertex/package-info.java | 21 + .../prepare_graph/TestConnectedComponents.java | 192 ++++++++ .../library/prepare_graph/TestPrepareGraph.java | 150 ++++++ giraph-block-app/pom.xml | 4 + .../reducers/array/HugeArrayUtils.java | 20 +- .../block_app/test_setup/NumericTestGraph.java | 9 +- .../test_setup/graphs/Small1GraphInit.java | 29 +- .../test_setup/graphs/Small2GraphInit.java | 29 +- .../graphs/SmallDirectedForestGraphInit.java | 76 +++ .../graphs/SmallDirectedTreeGraphInit.java | 75 +++ .../giraph/object/MultiSizedReusable.java | 113 +++++ .../org/apache/giraph/object/package-info.java | 21 + .../block_app/framework/BlockExecutionTest.java | 36 +- .../combiner/DoubleSumMessageCombiner.java | 6 +- .../combiner/FloatSumMessageCombiner.java | 6 +- .../apache/giraph/combiner/MessageCombiner.java | 6 +- .../combiner/MinimumDoubleMessageCombiner.java | 3 +- .../combiner/MinimumIntMessageCombiner.java | 2 +- .../combiner/SimpleSumMessageCombiner.java | 5 +- .../giraph/combiner/SumMessageCombiner.java | 73 +++ .../giraph/reducers/impl/MaxPairReducer.java | 87 ++++ .../master/TestComputationCombinerTypes.java | 2 +- .../apache/giraph/master/TestSwitchClasses.java | 4 +- .../giraph/vertex/TestComputationTypes.java | 4 +- pom.xml | 28 +- 38 files changed, 2790 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/checkstyle-relaxed-8.xml ---------------------------------------------------------------------- diff --git a/checkstyle-relaxed-8.xml b/checkstyle-relaxed-8.xml new file mode 100644 index 0000000..88a95cf --- /dev/null +++ b/checkstyle-relaxed-8.xml @@ -0,0 +1,275 @@ +<?xml version="1.0"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + This version of checkstyle is based on the Hadoop and common-math + checkstyle configurations. It is a best effort attempt to try to match + the CODE_CONVENTIONS and Oracle "Code Conventions for the Java + Programming Language". See the following link: + + http://www.oracle.com/technetwork/java/codeconvtoc-136057.html + + The documentation for checkstyle is available at + + http://checkstyle.sourceforge.net +--> + +<!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.1//EN" "http://www.puppycrawl.com/dtds/configuration_1_1.dtd"> + +<!-- Apache giraph customization of default Checkstyle behavior --> +<module name="Checker"> + <property name="localeLanguage" value="en"/> + + <!-- Checks for headers --> + <!-- See http://checkstyle.sf.net/config_header.html --> + <!-- Verify that EVERY source file has the appropriate license --> + <module name="Header"> + <property name="headerFile" value="${checkstyle.header.file}"/> + <property name="fileExtensions" value="java"/> + </module> + + <!-- Checks for Javadoc comments (checker). --> + <!-- See http://checkstyle.sf.net/config_javadoc.html --> + <!-- Require package javadoc --> + <module name="JavadocPackage"/> + + <!-- Miscellaneous other checks (checker). --> + <!-- See http://checkstyle.sf.net/config_misc.html --> + <!-- Require files to end with newline characters --> + <module name="NewlineAtEndOfFile"/> + + <!-- Checks for whitespace (tree walker) --> + <!-- See http://checkstyle.sf.net/config_whitespace.html --> + <!-- No tabs allowed! --> + <module name="FileTabCharacter"/> + + <module name="TreeWalker"> + <property name="cacheFile" value="target/checkstyle-cachefile"/> + + <!-- Checks for blocks. You know, those {}'s --> + <!-- See http://checkstyle.sf.net/config_blocks.html --> + <!-- No empty blocks (i.e. catch) --> + <module name="EmptyBlock"/> + <module name="AvoidNestedBlocks"/> + <!-- No if/else/do/for/while without braces --> + <module name="NeedBraces"/> + <module name="LeftCurly"/> + <module name="RightCurly"/> + + <!-- Checks for class design --> + <!-- See http://checkstyle.sf.net/config_design.html --> + <!-- Utility class should not be instantiated, they must have a + private constructor --> + <module name="HideUtilityClassConstructor"/> + <!-- Interfaces must be types (not just constants) --> + <module name="InterfaceIsType"/> + <!-- No public fields --> + <module name="VisibilityModifier"> + <property name="protectedAllowed" value="true"/> + <property name="publicMemberPattern" value="^$"/> + </module> + + <!-- Checks for common coding problems --> + <!-- See http://checkstyle.sf.net/config_coding.html --> + <module name="EmptyStatement"/> + <!-- Require hash code override when equals is --> + <module name="EqualsHashCode"/> + <!-- Method parameters and local variables should not hide + fields, except in constructors and setters --> + <module name="HiddenField"> + <property name="ignoreConstructorParameter" value="true" /> + <property name="ignoreSetter" value="true" /> + <property name="tokens" value="VARIABLE_DEF"/> + </module> + <!-- Disallow unnecessary instantiation of Boolean, String --> + <module name="IllegalInstantiation"> + <property name="classes" value="java.lang.Boolean, java.lang.String"/> + </module> + <module name="InnerAssignment"/> + <!-- Switch statements should be complete and with independent cases --> + <module name="FallThrough" /> + <module name="MissingSwitchDefault" /> + <module name="SimplifyBooleanExpression"/> + <module name="SimplifyBooleanReturn"/> + <!-- Only one statment per line allowed --> + <module name="OneStatementPerLine"/> + <!-- Use a consistent way to put declarations --> + <module name="DeclarationOrder" /> + <!-- Don't add up parentheses when they are not required --> + <module name="UnnecessaryParentheses" /> + <!-- Don't use too widespread catch (Exception, Throwable, + RuntimeException) --> + <module name="IllegalCatch" /> + <!-- Don't use = or != for string comparisons --> + <module name="StringLiteralEquality" /> + <!-- Don't declare multiple variables in the same statement --> + <module name="MultipleVariableDeclarations" /> + <!-- String literals more than one character long should not be + repeated several times --> + <!-- the "unchecked" string is also accepted to allow + @SuppressWarnings("unchecked") --> + <!-- Disabling for now until we have a better ignoreStringsRegexp --> + <!-- + <module name="MultipleStringLiterals" > + <property name="ignoreStringsRegexp" value='^(("")|(".")|("unchecked"))$'/> + </module> + --> + + <!-- Checks for imports --> + <!-- See http://checkstyle.sf.net/config_import.html --> + <module name="RedundantImport"/> + <!-- Import should be explicit, really needed and only from pure + java packages --> + <module name="AvoidStarImport" /> + <module name="UnusedImports" /> + <module name="IllegalImport" /> + + <!-- Checks for Javadoc comments (tree walker). --> + <!-- See http://checkstyle.sf.net/config_javadoc.html --> + <!-- Javadoc must be formatted correctly --> + <module name="JavadocStyle"> + <property name="checkFirstSentence" value="false"/> + </module> + <!-- Must have class / interface header comments --> + <module name="JavadocType"/> + <!-- Require method javadocs, allow undeclared RTE, allow missing + javadoc on getters and setters --> + <module name="JavadocMethod"> + <property name="allowMissingJavadoc" value="true"/> + <property name="allowUndeclaredRTE" value="true"/> + <property name="allowMissingThrowsTags" value="true"/> + <property name="allowMissingPropertyJavadoc" value="true"/> + <property name="allowMissingParamTags" value="true"/> + <property name="allowMissingReturnTag" value="true"/> + </module> + + <!-- Miscellaneous other checks (tree walker). --> + <!-- See http://checkstyle.sf.net/config_misc.html --> + <!-- Java style arrays --> + <module name="ArrayTypeStyle"/> + + <!-- Indentation --> +<!-- Ignore indentation before https://github.com/checkstyle/checkstyle/issues/281 is fixed + <module name="Indentation"> + <property name="basicOffset" value="2"/> + <property name="braceAdjustment" value="0"/> + <property name="caseIndent" value="0"/> + <property name="throwsIndent" value="2"/> + <property name="lineWrappingIndentation" value="0"/> + <property name="arrayInitIndent" value="2"/> + </module> +--> + <!-- Turn this on to see what needs to be done + <module name="TodoComment"/> + --> + <module name="UpperEll"/> + + <!-- Modifier Checks --> + <!-- See http://checkstyle.sf.net/config_modifiers.html --> + <!-- Use a consistent way to put modifiers --> + <module name="ModifierOrder"/> + <module name="RedundantModifier"/> + + <!-- Checks for Naming Conventions. --> + <!-- See http://checkstyle.sf.net/config_naming.html --> + <!-- Constant names should obey the traditional all uppercase + naming convention --> + <module name="ConstantName"/> + <module name="LocalFinalVariableName"/> + <module name="LocalVariableName"/> + <module name="MemberName"/> + <module name="MethodName"/> + <module name="PackageName"/> + <module name="ParameterName"/> + <module name="StaticVariableName"> + <property name="format" value="^[A-Z0-9_]*$"/> + </module> + <module name="TypeName"/> + + <!-- Checks for regexp expressions. --> + <!-- See http://checkstyle.sf.net/config_regexp.html --> + <!-- No trailing whitespace --> + <module name="Regexp"> + <property name="format" value="[ \t]+$"/> + <property name="illegalPattern" value="true"/> + <property name="message" value="Trailing whitespace"/> + </module> + <!-- No System.out.println() statements --> + <module name="Regexp"> + <!-- No sysouts --> + <property name="format" value="System\.out\.println"/> + <property name="illegalPattern" value="true"/> + </module> + <!-- Authors should be in pom.xml file --> + <module name="Regexp"> + <property name="format" value="@author"/> + <property name="illegalPattern" value="true"/> + <property name="message" value="developers names should be in pom file"/> + </module> + + <!-- Checks for Size Violations. --> + <!-- See http://checkstyle.sf.net/config_sizes.html --> + <!-- Lines cannot exceed 80 chars --> + <module name="LineLength"> + <property name="max" value="80"/> + <property name="ignorePattern" value="^import"/> + </module> + <!-- Over time, we will revised this down --> + <module name="MethodLength"> + <property name="max" value="200"/> + </module> + <module name="ParameterNumber"> + <property name="max" value="8"/> + </module> + + <!-- Checks for whitespace (tree walker) --> + <!-- See http://checkstyle.sf.net/config_whitespace.html --> + <module name="EmptyForIteratorPad"/> + <!-- Spacing around methods --> + <module name="MethodParamPad"> + <property name="option" value="nospace"/> + <property name="allowLineBreaks" value="true"/> + </module> + <!-- No whitespace before a token --> + <module name="NoWhitespaceBefore"/> + <!-- Whitespace after tokens is required --> + <module name="WhitespaceAfter"/> + <!-- Whitespace around tokens is required --> + <module name="WhitespaceAround"/> + <module name="ParenPad"/> + <module name="TypecastParenPad"/> + <!-- No extra whitespace around types --> + <module name="GenericWhitespace"/> + <!-- Operator must be at end of wrapped line --> + <module name="OperatorWrap"> + <property name="option" value="eol"/> + </module> + + <!-- Required for SuppressionCommentFilter below --> + <module name="FileContentsHolder"/> + </module> + + <!-- Setup special comments to suppress specific checks from source files --> + <module name="SuppressionCommentFilter"> + <property name="offCommentFormat" value="CHECKSTYLE\: stop ([\w\|]+)"/> + <property name="onCommentFormat" value="CHECKSTYLE\: resume ([\w\|]+)"/> + <property name="checkFormat" value="$1"/> + </module> +</module> + http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index cc1a05a..afdf041 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -103,6 +103,6 @@ <Match> <!-- Java Serialization is not used, so this is never an actual issue. On the other hand, Kryo needs lambdas to be Serializable to work. --> - <Bug pattern="SE_BAD_FIELD,SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" /> + <Bug pattern="SE_BAD_FIELD,SE_COMPARATOR_SHOULD_BE_SERIALIZABLE,SE_NO_SERIALVERSIONID" /> </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/pom.xml b/giraph-block-app-8/pom.xml new file mode 100644 index 0000000..6645eed --- /dev/null +++ b/giraph-block-app-8/pom.xml @@ -0,0 +1,137 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.giraph</groupId> + <artifactId>giraph-parent</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>giraph-block-app-8</artifactId> + <packaging>jar</packaging> + + <name>Apache Giraph Blocks Framework in Java 8</name> + <url>http://giraph.apache.org/giraph-block-app/</url> + <description>Giraph Blocks Framework and utilities for writing applications in Java 8</description> + + <properties> + <top.dir>${project.basedir}/..</top.dir> + <checkstyle.config.path>${top.dir}/checkstyle-relaxed-8.xml</checkstyle.config.path> + <project.build.targetJdk>1.8</project.build.targetJdk> + <project.build.javaHome>${env.JAVA_8_HOME}</project.build.javaHome> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <id>attach-javadocs</id> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <additionalparam>-Xdoclint:none</additionalparam> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <configuration> + <siteDirectory>${project.basedir}/src/site</siteDirectory> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.6</version> + <configuration> + <skip>${surefire.skip}</skip> + <systemProperties> + <property> + <name>prop.jarLocation</name> + <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value> + </property> + </systemProperties> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- compile dependencies. sorted lexicographically. --> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </dependency> + <dependency> + <groupId>org.apache.giraph</groupId> + <artifactId>giraph-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.giraph</groupId> + <artifactId>giraph-block-app</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + + <!-- runtime dependency --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>runtime</scope> + </dependency> + + <!-- test dependencies. sorted lexicographically. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/assembly/compile.xml ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/assembly/compile.xml b/giraph-block-app-8/src/main/assembly/compile.xml new file mode 100644 index 0000000..fcaffa6 --- /dev/null +++ b/giraph-block-app-8/src/main/assembly/compile.xml @@ -0,0 +1,39 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>jar-with-dependencies</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + + <dependencySets> + <dependencySet> + <useProjectArtifact>true</useProjectArtifact> + <outputDirectory>/</outputDirectory> + <unpackOptions> + <excludes> + <exclude>META-INF/LICENSE</exclude> + </excludes> + </unpackOptions> + <unpack>true</unpack> + <scope>runtime</scope> + </dependencySet> + </dependencySets> +</assembly> http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/ReusableSuppliers.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/ReusableSuppliers.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/ReusableSuppliers.java new file mode 100644 index 0000000..cadbf2f --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/ReusableSuppliers.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library; + +import org.apache.giraph.function.primitive.Obj2DoubleFunction; +import org.apache.giraph.function.primitive.Obj2FloatFunction; +import org.apache.giraph.function.primitive.Obj2IntFunction; +import org.apache.giraph.function.primitive.Obj2LongFunction; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * SupplierFromVertex that wrap other functions, providing common + * pattern for reusing objects, and minimizing GC overhead. + */ +public class ReusableSuppliers { + /** Hide constructor */ + private ReusableSuppliers() { } + + /** + * Transforms primitive long supplier into + * LongWritable supplier, with object being reused, + * to minimize GC overhead. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, LongWritable> + fromLong(Obj2LongFunction<Vertex<I, V, E>> supplier) { + LongWritable reusable = new LongWritable(); + return (vertex) -> { + reusable.set(supplier.apply(vertex)); + return reusable; + }; + } + + /** + * Transforms primitive int supplier into + * IntWritable supplier, with object being reused, + * to minimize GC overhead. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, IntWritable> + fromInt(Obj2IntFunction<Vertex<I, V, E>> supplier) { + IntWritable reusable = new IntWritable(); + return (vertex) -> { + reusable.set(supplier.apply(vertex)); + return reusable; + }; + } + + /** + * Transforms primitive float supplier into + * FloatWritable supplier, with object being reused, + * to minimize GC overhead. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, FloatWritable> + fromFloat(Obj2FloatFunction<Vertex<I, V, E>> supplier) { + FloatWritable reusable = new FloatWritable(); + return (vertex) -> { + reusable.set(supplier.apply(vertex)); + return reusable; + }; + } + + /** + * Transforms primitive double supplier into + * DoubleWritable supplier, with object being reused, + * to minimize GC overhead. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + SupplierFromVertex<I, V, E, DoubleWritable> + fromDouble(Obj2DoubleFunction<Vertex<I, V, E>> supplier) { + DoubleWritable reusable = new DoubleWritable(); + return (vertex) -> { + reusable.set(supplier.apply(vertex)); + return reusable; + }; + } + + /** + * Creates SupplierFromVertex, by passing reusable value into + * consumer argument, and returning same reusable every time. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable, + T extends Writable> + SupplierFromVertex<I, V, E, T> withReusable( + final T reusable, final ConsumerWithVertex<I, V, E, T> consumer) { + return (vertex) -> { + consumer.apply(vertex, reusable); + return reusable; + }; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/package-info.java new file mode 100644 index 0000000..801150c --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Core library of Pieces and Suppliers, providing most common usages. + */ +package org.apache.giraph.block_app.library; http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java new file mode 100644 index 0000000..b4d40dc --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.prepare_graph; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.delegate.DelegatePiece; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.block_app.library.ReusableSuppliers; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.edge.MutableEdge; +import org.apache.giraph.edge.ReusableEdge; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.primitive.Int2ObjFunction; +import org.apache.giraph.function.primitive.Obj2DoubleFunction; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.object.MultiSizedReusable; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.NoMessage; +import org.apache.giraph.types.ops.NumericTypeOps; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.collections.Basic2ObjectMap; +import org.apache.giraph.types.ops.collections.BasicSet; +import org.apache.giraph.writable.tuple.PairWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +/** + * Utility class for Pieces and Blocks that prepare graph by processing edges. + */ +@SuppressWarnings("rawtypes") +public class PrepareGraphPieces { + private static final Logger LOG = Logger.getLogger(PrepareGraphPieces.class); + + /** Hide constructor */ + private PrepareGraphPieces() { + } + + /** + * Cleans symmetric unweighted graph, by removing duplicate edges, + * adding opposite edges for asymetric edges, and removing standalone + * vertices + * + * @param idTypeOps Vertex id type ops + * @param <I> Vertex id type + * @return Block that cleans the graph + */ + public static + <I extends WritableComparable> + Block cleanSymmetricUnweightedGraph(PrimitiveIdTypeOps<I> idTypeOps) { + return new SequenceBlock( + removeDuplicateEdges(idTypeOps), + makeSymmetricUnweighted(idTypeOps), + removeStandAloneVertices()); + } + + /** + * Creates a piece that removes edges that exist only in one direction. + * If edge exists in both directions, but with different edge value, + * both edges are left intact. + * + * @param idTypeOps Vertex id type ops + * @param <I> Vertex id type + * @return Piece that removes asymmetrical edges + */ + public static + <I extends WritableComparable> + Block removeAsymEdges(PrimitiveIdTypeOps<I> idTypeOps) { + ConsumerWithVertex<I, Writable, Writable, Iterable<I>> + removeEdges = (vertex, neighbors) -> { + BasicSet<I> set = idTypeOps.createOpenHashSet(); + for (I message : neighbors) { + set.add(message); + } + + for (Iterator<MutableEdge<I, Writable>> iter = + vertex.getMutableEdges().iterator(); + iter.hasNext();) { + MutableEdge<I, Writable> edge = iter.next(); + if (!set.contains(edge.getTargetVertexId())) { + iter.remove(); + } + } + }; + return Pieces.sendMessageToNeighbors( + "RemoveAsymEdges", + idTypeOps.getTypeClass(), + VertexSuppliers.vertexIdSupplier(), + removeEdges); + } + + /** + * Remove duplicate edges, for each vertex only leaving + * instance of first outgoing edge to each target vertex. + * + * If graph is weighted, you might want to prefer summing the + * weights, instead of removing later occurrences. + */ + public static <I extends WritableComparable> + Block removeDuplicateEdges(PrimitiveIdTypeOps<I> idTypeOps) { + Int2ObjFunction<BasicSet<I>> reusableSets = + MultiSizedReusable.createForBasicSet(idTypeOps); + return Pieces.<I, Writable, Writable>forAllVertices( + "RemoveDuplicateEdges", + (vertex) -> { + BasicSet<I> set = reusableSets.apply(vertex.getNumEdges()); + for (Iterator<MutableEdge<I, Writable>> iter = + vertex.getMutableEdges().iterator(); + iter.hasNext();) { + MutableEdge<I, Writable> edge = iter.next(); + if (!set.add(edge.getTargetVertexId())) { + iter.remove(); + } + } + }); + } + + /** + * Creates a piece that will take an unweighted graph (graph with edge value + * being NullWritable), and make it symmetric, by creating all edges + * that are not present and opposite edge exists. + * + * @param idTypeOps Vertex id type ops + * @return Piece that makes unweighted graph symmetric + */ + public static <I extends WritableComparable> + Block makeSymmetricUnweighted(PrimitiveIdTypeOps<I> idTypeOps) { + Int2ObjFunction<BasicSet<I>> reusableSets = + MultiSizedReusable.createForBasicSet(idTypeOps); + ConsumerWithVertex<I, Writable, NullWritable, Iterable<I>> + addEdges = (vertex, neighbors) -> { + BasicSet<I> set = reusableSets.apply(vertex.getNumEdges()); + + for (Edge<I, NullWritable> edge : vertex.getEdges()) { + set.add(edge.getTargetVertexId()); + } + for (I neighbor : neighbors) { + if (!set.contains(neighbor)) { + Edge<I, NullWritable> edge = + EdgeFactory.create(idTypeOps.createCopy(neighbor)); + vertex.addEdge(edge); + set.add(neighbor); + } + } + }; + return Pieces.sendMessageToNeighbors( + "MakeSymmetricUnweighted", + idTypeOps.getTypeClass(), + VertexSuppliers.vertexIdSupplier(), + addEdges); + } + + /** + * Make weighted graph symmetric - by making edge weight in both directions + * equal to the sum of weights in both directions. + * + * This means if graph is already symmetric - resulting graph will have + * doubled weights. + * We are not taking average, in order to work with integer numbers - because + * division by two might not be integer number. + */ + public static <I extends WritableComparable, V extends Writable, + E extends Writable> + Block makeSymmetricWeighted( + PrimitiveIdTypeOps<I> idTypeOps, NumericTypeOps<E> edgeTypeOps) { + MessageValueFactory<PairWritable<I, E>> messageFactory = + () -> new PairWritable<>(idTypeOps.create(), edgeTypeOps.create()); + return new Piece<I, V, E, PairWritable<I, E>, Object>() { + @Override + public VertexSender<I, V, E> getVertexSender( + BlockWorkerSendApi<I, V, E, PairWritable<I, E>> workerApi, + Object executionStage) { + PairWritable<I, E> message = messageFactory.newInstance(); + return (vertex) -> { + idTypeOps.set(message.getLeft(), vertex.getId()); + for (Edge<I, E> edge : vertex.getEdges()) { + edgeTypeOps.set(message.getRight(), edge.getValue()); + workerApi.sendMessage(edge.getTargetVertexId(), message); + } + }; + } + + @Override + public + VertexReceiver<I, V, E, PairWritable<I, E>> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + // TODO: After t5921368, make edges also primitive + ReusableEdge<I, E> reusableEdge = + EdgeFactory.createReusable(null, null); + return (vertex, messages) -> { + Basic2ObjectMap<I, E> map = + idTypeOps.create2ObjectOpenHashMap(null); + for (PairWritable<I, E> message : messages) { + if (map.containsKey(message.getLeft())) { + edgeTypeOps.plusInto( + map.get(message.getLeft()), message.getRight()); + } else { + // create edge copy, since it is not stored as primitive yet + map.put(message.getLeft(), + edgeTypeOps.createCopy(message.getRight())); + } + } + + for (MutableEdge<I, E> edge : vertex.getMutableEdges()) { + E receivedWeight = map.remove(edge.getTargetVertexId()); + if (receivedWeight != null) { + edgeTypeOps.plusInto(edge.getValue(), receivedWeight); + edge.setValue(edge.getValue()); + } + } + + // TODO: add foreach, or entry iterator to Basic Maps + for (Iterator<I> iter = map.fastKeyIterator(); iter.hasNext();) { + I neighbor = iter.next(); + reusableEdge.setTargetVertexId(neighbor); + reusableEdge.setValue(map.get(neighbor)); + vertex.addEdge(reusableEdge); + } + }; + } + + @Override + protected MessageValueFactory<PairWritable<I, E>> getMessageFactory( + ImmutableClassesGiraphConfiguration conf) { + return messageFactory; + } + + @Override + public String toString() { + return "MakeSymmetricWeighted"; + } + }; + } + + + /** + * Removing vertices from the graph takes effect in the next superstep. + * Adding an empty piece is to prevent calling the next send inner piece + * before vertices are being actually removed. + */ + public static Block removeStandAloneVertices() { + return Pieces.removeVertices( + "RemoveStandaloneVertices", + (vertex) -> vertex.getNumEdges() == 0); + } + + public static Block normalizeDoubleEdges() { + ObjectTransfer<DoubleWritable> sumEdgeWeights = + new ObjectTransfer<>(); + ObjectTransfer<LongWritable> countEdges = new ObjectTransfer<>(); + + return new DelegatePiece<>( + calcSumEdgesPiece(DoubleWritable::get, sumEdgeWeights), + countTotalEdgesPiece(countEdges), + new Piece<WritableComparable, Writable, DoubleWritable, NoMessage, + Object>() { + private double averageEdgeWeight; + + @Override + public void masterCompute( + BlockMasterApi master, Object executionStage) { + averageEdgeWeight = + sumEdgeWeights.get().get() / countEdges.get().get(); + LOG.info("Averge edge weight " + averageEdgeWeight); + } + + @Override + public VertexReceiver<WritableComparable, Writable, DoubleWritable, + NoMessage> getVertexReceiver( + BlockWorkerReceiveApi<WritableComparable> workerApi, + Object executionStage) { + DoubleWritable doubleWritable = new DoubleWritable(); + return (vertex, messages) -> { + for (MutableEdge<WritableComparable, DoubleWritable> edge : + vertex.getMutableEdges()) { + doubleWritable.set( + (float) (edge.getValue().get() / averageEdgeWeight)); + edge.setValue(doubleWritable); + } + }; + } + + @Override + public String toString() { + return "NormalizeDoubleEdges"; + } + }); + } + + public static Block normalizeFloatEdges() { + ObjectTransfer<DoubleWritable> sumEdgeWeights = + new ObjectTransfer<>(); + ObjectTransfer<LongWritable> countEdges = new ObjectTransfer<>(); + + return new DelegatePiece<>( + calcSumEdgesPiece(FloatWritable::get, sumEdgeWeights), + countTotalEdgesPiece(countEdges), + new Piece<WritableComparable, Writable, FloatWritable, NoMessage, + Object>() { + private double averageEdgeWeight; + + @Override + public void masterCompute( + BlockMasterApi master, Object executionStage) { + averageEdgeWeight = + sumEdgeWeights.get().get() / countEdges.get().get(); + LOG.info("Averge edge weight " + averageEdgeWeight); + } + + @Override + public VertexReceiver<WritableComparable, Writable, FloatWritable, + NoMessage> getVertexReceiver( + BlockWorkerReceiveApi<WritableComparable> workerApi, + Object executionStage) { + FloatWritable floatWritable = new FloatWritable(); + return (vertex, messages) -> { + for (MutableEdge<WritableComparable, FloatWritable> edge : + vertex.getMutableEdges()) { + floatWritable.set( + (float) (edge.getValue().get() / averageEdgeWeight)); + edge.setValue(floatWritable); + } + }; + } + + @Override + public String toString() { + return "NormalizeFloatEdges"; + } + }); + } + + /** + * Piece that calculates total number of edges, + * and gives result to the {@code countEdges} consumer. + */ + public static + Piece<WritableComparable, Writable, Writable, NoMessage, Object> + countTotalEdgesPiece(Consumer<LongWritable> countEdges) { + return Pieces.reduce( + "CountTotalEdgesPiece", + SumReduce.LONG, + ReusableSuppliers.fromLong((vertex) -> vertex.getNumEdges()), + countEdges); + } + + /** + * Piece that calculates total edge weight in the graph, + * and gives result to the {@code sumEdgeWeights} consumer. + */ + public static <E extends Writable> + Piece<WritableComparable, Writable, E, NoMessage, Object> calcSumEdgesPiece( + Obj2DoubleFunction<E> edgeValueF, + Consumer<DoubleWritable> sumEdgeWeights) { + return Pieces.reduce( + "CalcSumEdgesPiece", + SumReduce.DOUBLE, + ReusableSuppliers.fromDouble((vertex) -> { + double sum = 0; + for (Edge<WritableComparable, E> edge : vertex.getEdges()) { + sum += edgeValueF.apply(edge.getValue()); + } + return sum; + }), + sumEdgeWeights); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java new file mode 100644 index 0000000..2610436 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.prepare_graph; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.CreateReducersApi; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.RepeatUntilBlock; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.block_app.library.SendMessageChain; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.block_app.reducers.map.BasicMapReduce; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.combiner.SumMessageCombiner; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.ObjectTransfer; +import org.apache.giraph.function.Supplier; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.reducers.impl.MaxPairReducer; +import org.apache.giraph.reducers.impl.SumReduce; +import org.apache.giraph.types.NoMessage; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.TypeOps; +import org.apache.giraph.writable.tuple.LongLongWritable; +import org.apache.giraph.writable.tuple.PairWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.log4j.Logger; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + +/** + * Class for creating utility blocks for calculating and processing + * connected components. + * + * Graph is expected to be symmetric before calling any of the methods here. + */ +public class UndirectedConnectedComponents { + private static final Logger LOG = + Logger.getLogger(UndirectedConnectedComponents.class); + + private UndirectedConnectedComponents() { } + + /** Initialize vertex values for connected components calculation */ + private static <I extends WritableComparable, V extends Writable> + Piece<I, V, Writable, NoMessage, Object> createInitializePiece( + TypeOps<I> idTypeOps, + Consumer<Boolean> vertexUpdatedComponent, + ConsumerWithVertex<I, V, Writable, I> setComponent, + SupplierFromVertex<I, V, Writable, + ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier) { + I result = idTypeOps.create(); + return Pieces.forAllVerticesOnReceive("InitializeCC", (vertex) -> { + idTypeOps.set(result, vertex.getId()); + boolean updated = false; + for (Edge<I, ?> edge : edgeSupplier.get(vertex)) { + if (result.compareTo(edge.getTargetVertexId()) > 0) { + idTypeOps.set(result, edge.getTargetVertexId()); + updated = true; + } + } + setComponent.apply(vertex, result); + vertexUpdatedComponent.apply(updated); + }); + } + + /** Propagate connected components to neighbor pieces */ + private static class PropagateConnectedComponentsPiece + <I extends WritableComparable, V extends Writable> + extends Piece<I, V, Writable, I, Object> { + private final TypeOps<I> idTypeOps; + private final Supplier<Boolean> vertexToPropagate; + private final Consumer<Boolean> vertexUpdatedComponent; + private final Consumer<Boolean> converged; + private final SupplierFromVertex<I, V, Writable, I> getComponent; + private final ConsumerWithVertex<I, V, Writable, I> setComponent; + private final SupplierFromVertex<I, V, Writable, + ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier; + + private ReducerHandle<LongWritable, LongWritable> propagatedAggregator; + + PropagateConnectedComponentsPiece( + TypeOps<I> idTypeOps, + Supplier<Boolean> vertexToPropagate, + Consumer<Boolean> vertexUpdatedComponent, + Consumer<Boolean> converged, + SupplierFromVertex<I, V, Writable, I> getComponent, + ConsumerWithVertex<I, V, Writable, I> setComponent, + SupplierFromVertex<I, V, Writable, + ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier) { + this.idTypeOps = idTypeOps; + this.vertexToPropagate = vertexToPropagate; + this.vertexUpdatedComponent = vertexUpdatedComponent; + this.converged = converged; + this.getComponent = getComponent; + this.setComponent = setComponent; + this.edgeSupplier = edgeSupplier; + } + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + propagatedAggregator = reduceApi.createLocalReducer(SumReduce.LONG); + } + + @Override + public VertexSender<I, V, Writable> getVertexSender( + final BlockWorkerSendApi<I, V, Writable, I> workerApi, + Object executionStage) { + final LongWritable one = new LongWritable(1); + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<I, V, Writable> vertex) { + if (vertexToPropagate.get()) { + workerApi.sendMessageToMultipleEdges( + Iterators.transform( + edgeSupplier.get(vertex).iterator(), + new Function<Edge<I, ?>, I>() { + @Override + public I apply(Edge<I, ?> edge) { + return edge.getTargetVertexId(); + } + }), + getComponent.get(vertex)); + propagatedAggregator.reduce(one); + } + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + converged.apply(propagatedAggregator.getReducedValue(master).get() == 0); + LOG.info("Undirected CC: " + + propagatedAggregator.getReducedValue(master).get() + + " many vertices sent in this iteration"); + } + + @Override + public VertexReceiver<I, V, Writable, I> getVertexReceiver( + BlockWorkerReceiveApi<I> workerApi, Object executionStage) { + return new InnerVertexReceiver() { + private final I received = idTypeOps.create(); + + @Override + public void vertexReceive(Vertex<I, V, Writable> vertex, + Iterable<I> messages) { + boolean first = true; + for (I value : messages) { + if (first) { + idTypeOps.set(received, value); + first = false; + } else { + if (received.compareTo(value) > 0) { + idTypeOps.set(received, value); + } + } + } + + I cur = getComponent.get(vertex); + if (!first && cur.compareTo(received) > 0) { + setComponent.apply(vertex, received); + vertexUpdatedComponent.apply(true); + } else { + vertexUpdatedComponent.apply(false); + } + } + }; + } + + @Override + public Class<I> getMessageClass() { + return idTypeOps.getTypeClass(); + } + } + + /** Calculates number of components, and the number of active vertices */ + public static final class CalculateNumberOfComponents<V extends Writable> + extends Piece<LongWritable, V, Writable, LongWritable, Object> { + private final Consumer<LongWritable> numActiveConsumer; + private final Consumer<LongWritable> numComponentsConsumer; + private final + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent; + + private ReducerHandle<LongWritable, LongWritable> numComponentsAggregator; + private ReducerHandle<LongWritable, LongWritable> numActiveAggregator; + + public CalculateNumberOfComponents( + Consumer<LongWritable> numActiveConsumer, + Consumer<LongWritable> numComponentsConsumer, + SupplierFromVertex<LongWritable, V, Writable, LongWritable> + getComponent) { + this.numActiveConsumer = numActiveConsumer; + this.numComponentsConsumer = numComponentsConsumer; + this.getComponent = getComponent; + } + + @Override + public void registerReducers( + CreateReducersApi reduceApi, Object executionStage) { + numComponentsAggregator = reduceApi.createLocalReducer(SumReduce.LONG); + numActiveAggregator = reduceApi.createLocalReducer(SumReduce.LONG); + } + + @Override + public VertexSender<LongWritable, V, Writable> getVertexSender( + BlockWorkerSendApi<LongWritable, V, Writable, LongWritable> workerApi, + Object executionStage) { + final LongWritable one = new LongWritable(1); + return new InnerVertexSender() { + @Override + public void vertexSend(Vertex<LongWritable, V, Writable> vertex) { + numActiveAggregator.reduce(one); + // Only aggregate if you are the minimum of your CC + if (vertex.getId().get() == getComponent.get(vertex).get()) { + numComponentsAggregator.reduce(one); + } + } + }; + } + + @Override + public void masterCompute(BlockMasterApi master, Object executionStage) { + numActiveConsumer.apply(numActiveAggregator.getReducedValue(master)); + numComponentsConsumer.apply( + numComponentsAggregator.getReducedValue(master)); + LOG.info("Num active is : " + + numActiveAggregator.getReducedValue(master)); + LOG.info("Num components is : " + + numComponentsAggregator.getReducedValue(master)); + } + + @Override + protected + MessageCombiner<? super LongWritable, LongWritable> getMessageCombiner( + ImmutableClassesGiraphConfiguration conf) { + return SumMessageCombiner.LONG; + } + } + + /** + * Calculate connected components, doing as many iterations as needed, + * but no more than maxIterations. + * + * Graph is expected to be symmetric. + */ + public static <I extends WritableComparable, V extends Writable> + Block calculateConnectedComponents( + int maxIterations, + TypeOps<I> idTypeOps, + final SupplierFromVertex<I, V, Writable, I> getComponent, + final ConsumerWithVertex<I, V, Writable, I> setComponent, + SupplierFromVertex<I, V, Writable, + ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier) { + ObjectTransfer<Boolean> converged = new ObjectTransfer<>(); + ObjectTransfer<Boolean> vertexUpdatedComponent = new ObjectTransfer<>(); + + return new SequenceBlock( + createInitializePiece( + idTypeOps, + vertexUpdatedComponent, + setComponent, + edgeSupplier), + new RepeatUntilBlock( + maxIterations, + new PropagateConnectedComponentsPiece<>( + idTypeOps, + vertexUpdatedComponent, + vertexUpdatedComponent, + converged, getComponent, setComponent, + edgeSupplier + ), + converged + ) + ); + } + + /** + * Default block, which calculates connected components using the + * vertex's default edges. + */ + public static <I extends WritableComparable, V extends Writable> + Block calculateConnectedComponents( + int maxIterations, + TypeOps<I> idTypeOps, + final SupplierFromVertex<I, V, Writable, I> getComponent, + final ConsumerWithVertex<I, V, Writable, I> setComponent) { + return calculateConnectedComponents( + maxIterations, + idTypeOps, + getComponent, + setComponent, + VertexSuppliers.vertexEdgesSupplier()); + } + + public static <V extends Writable> + Block calculateConnectedComponents( + int maxIterations, + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent + ) { + return calculateConnectedComponents( + maxIterations, + LongTypeOps.INSTANCE, + getComponent, + setComponent, + VertexSuppliers.vertexEdgesSupplier()); + } + + /** + * Calculates sizes of all components by aggregating on master, and allows + * each vertex to consume its size. Differs from CalculateComponentSizesPiece + * in that aggregation happens on master, instead of message sends to the + * component_id. + */ + public static <V extends Writable> + Block calculateConnectedComponentSizes( + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + ConsumerWithVertex<LongWritable, V, Writable, LongWritable> sizeConsumer + ) { + Pair<LongWritable, LongWritable> componentToReducePair = Pair.of( + new LongWritable(), new LongWritable(1)); + LongWritable reusableLong = new LongWritable(); + return Pieces.reduceAndBroadcast( + "CalcConnectedComponentSizes", + new BasicMapReduce<>( + LongTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG), + (Vertex<LongWritable, V, Writable> vertex) -> { + componentToReducePair.getLeft().set(getComponent.get(vertex).get()); + return componentToReducePair; + }, + (vertex, componentSizes) -> { + long compSize = componentSizes.get(getComponent.get(vertex)).get(); + reusableLong.set(compSize); + sizeConsumer.apply(vertex, reusableLong); + }); + } + + /** + * Given a graph with already calculated connected components - calculates + * ID of the largest one. + */ + public static <V extends Writable> + Block calculateLargestConnectedComponentStats( + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + Consumer<PairWritable<LongWritable, LongWritable>> + largestComponentConsumer) { + LongWritable one = new LongWritable(1); + LongLongWritable pair = new LongLongWritable(); + return SendMessageChain.<LongWritable, V, Writable, LongWritable>startSend( + "CalcComponentSizesPiece", + SumMessageCombiner.LONG, + (vertex) -> one, + (vertex) -> Iterators.singletonIterator(getComponent.get(vertex)) + ).endReduce( + "CalcLargestComponent", + new MaxPairReducer<>(LongTypeOps.INSTANCE, LongTypeOps.INSTANCE), + (vertex, message) -> { + long curSum = message != null ? message.get() : 0; + pair.getLeft().set(getComponent.get(vertex).get()); + pair.getRight().set(curSum); + return pair; + }, + largestComponentConsumer); + } + + /** + * Given a graph with already calculated connected components - calculates + * ID of the largest one. + */ + public static <V extends Writable> Block calculateLargestConnectedComponent( + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + LongWritable largestComponent) { + return calculateLargestConnectedComponentStats( + getComponent, + (t) -> largestComponent.set(t.getLeft().get()) + ); + } + + /** + * Given a graph with already calculated connected components - calculates + * size of the largest one. + */ + public static <V extends Writable> + Block calculateLargestConnectedComponentSize( + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + LongWritable largestComponentSize) { + return calculateLargestConnectedComponentStats( + getComponent, + (t) -> largestComponentSize.set(t.getRight().get())); + } + + /** + * Takes symmetric graph, and removes all edges/vertices that + * are not in largest connected component. + */ + public static <V extends Writable> Block calculateAndKeepLargestComponent( + int maxIterations, + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent + ) { + final LongWritable largestComponent = new LongWritable(); + return new SequenceBlock( + calculateConnectedComponents( + maxIterations, LongTypeOps.INSTANCE, getComponent, setComponent, + VertexSuppliers.vertexEdgesSupplier()), + calculateLargestConnectedComponent(getComponent, largestComponent), + Pieces.<LongWritable, V, Writable>removeVertices( + "KeepOnlyLargestComponent", + (vertex) -> !largestComponent.equals(getComponent.get(vertex)))); + } + + + /** + * Takes symmetric graph, and removes all edges/vertices that + * are belong to connected components smaller than specified + * threshold + */ + public static <V extends Writable> + Block calculateAndKeepComponentAboveThreshold( + int maxIterations, + int threshold, + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent + ) { + final ObjectTransfer<Boolean> belowThreshold = new ObjectTransfer<>(); + return new SequenceBlock( + UndirectedConnectedComponents.calculateConnectedComponents( + maxIterations, + LongTypeOps.INSTANCE, + getComponent, + setComponent + ), + UndirectedConnectedComponents.calculateConnectedComponentSizes( + getComponent, + (vertex, value) -> { + belowThreshold.apply(value.get() < threshold); + }), + Pieces.removeVertices( + "KeepAboveTresholdComponents", + belowThreshold.castToSupplier())); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/WeaklyConnectedComponents.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/WeaklyConnectedComponents.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/WeaklyConnectedComponents.java new file mode 100644 index 0000000..9070f75 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/WeaklyConnectedComponents.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.prepare_graph; + +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; +import it.unimi.dsi.fastutil.longs.LongSet; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.block.SequenceBlock; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.block_app.library.Pieces; +import org.apache.giraph.block_app.library.VertexSuppliers; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.edge.MutableEdge; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +/** + * Class for computing the weakly connected components of a directed graph. + * + * If the graph is undirected, this has the exact same behavior as + * UndirectedConnectedComponents. + * + * This currently does not support weighted directed graphs. + */ +public class WeaklyConnectedComponents { + private WeaklyConnectedComponents() { } + + /** Save existing edges */ + private static <V extends Writable> Block saveDirectedEdges( + ConsumerWithVertex<LongWritable, V, Writable, LongOpenHashSet> setEdges) { + return Pieces.<LongWritable, V, Writable>forAllVertices( + "SaveDirectedEdgesPiece", + (vertex) -> { + LongOpenHashSet friendids = new LongOpenHashSet(); + for (Edge<LongWritable, Writable> edge : vertex.getEdges()) { + friendids.add(edge.getTargetVertexId().get()); + } + setEdges.apply(vertex, friendids); + }); + } + + /** Restore existing edges */ + private static <V extends Writable> Block restoreDirectedEdges( + SupplierFromVertex<LongWritable, V, Writable, LongOpenHashSet> getEdges) { + return Pieces.<LongWritable, V, Writable>forAllVertices( + "RestoreDirectedEdgesPiece", + (vertex) -> { + LongOpenHashSet currentList = getEdges.get(vertex); + Iterator<MutableEdge<LongWritable, Writable>> iter = + vertex.getMutableEdges().iterator(); + while (iter.hasNext()) { + MutableEdge<LongWritable, Writable> edge = iter.next(); + if (!currentList.contains(edge.getTargetVertexId().get())) { + iter.remove(); + } + } + }); + } + + /** + * Calculate connected components, doing as many iterations as needed, + * but no more than maxIterations. + * + * Graph is expected to be symmetric. + */ + public static <V extends Writable> Block calculateConnectedComponents( + int maxIterations, + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent, + SupplierFromVertex<LongWritable, V, Writable, LongOpenHashSet> getEdges, + ConsumerWithVertex<LongWritable, V, Writable, LongOpenHashSet> setEdges, + boolean useFloatWeights) { + return new SequenceBlock( + saveDirectedEdges(setEdges), + makeSymmetric(useFloatWeights), + UndirectedConnectedComponents.calculateConnectedComponents( + maxIterations, LongTypeOps.INSTANCE, getComponent, setComponent), + restoreDirectedEdges(getEdges)); + } + + /** + * Takes unweighted directed graph, and removes all edges/vertices that + * are not in largest weakly connected component. + */ + public static <V extends Writable> Block calculateAndKeepLargestComponent( + int maxIterations, + SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent, + ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent, + SupplierFromVertex<LongWritable, V, Writable, LongOpenHashSet> getEdges, + ConsumerWithVertex<LongWritable, V, Writable, LongOpenHashSet> setEdges, + boolean useFloatWeights) { + return new SequenceBlock( + saveDirectedEdges(setEdges), + makeSymmetric(useFloatWeights), + UndirectedConnectedComponents.calculateAndKeepLargestComponent( + maxIterations, getComponent, setComponent), + restoreDirectedEdges(getEdges)); + } + + private static Block makeSymmetric(boolean useFloatWeights) { + if (!useFloatWeights) { + return PrepareGraphPieces.makeSymmetricUnweighted(LongTypeOps.INSTANCE); + } else { + return makeSymmetricFloatWeighted(); + } + } + + /** + * This is just used internally in weakly connected components if the graph + * has float weights. This works fine here because we don't actually ever + * use the weights, but should not be used outside of the + * WeaklyConnectedComponents class and hence is private. + */ + private static <V extends Writable> + Piece<LongWritable, V, FloatWritable, LongWritable, Object> + makeSymmetricFloatWeighted() { + LongSet set = new LongOpenHashSet(); + FloatWritable floatWritable = new FloatWritable(1.0f); + ConsumerWithVertex<LongWritable, V, FloatWritable, Iterable<LongWritable>> + addEdges = (vertex, neighbors) -> { + set.clear(); + for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) { + set.add(edge.getTargetVertexId().get()); + } + for (LongWritable message : neighbors) { + if (!set.contains(message.get())) { + Edge<LongWritable, FloatWritable> edge = EdgeFactory.create( + new LongWritable(message.get()), floatWritable); + vertex.addEdge(edge); + set.add(message.get()); + } + } + }; + return Pieces.sendMessageToNeighbors( + "MakeSymmetricFloatWeighted", + LongWritable.class, + VertexSuppliers.<LongWritable, V, FloatWritable>vertexIdSupplier(), + addEdges); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/package-info.java new file mode 100644 index 0000000..d3c263a --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Vertex Value interfaces and implementations + * for preparing and preprocessing the graph. + */ +package org.apache.giraph.block_app.library.prepare_graph; http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/ConnectedComponentVertexValue.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/ConnectedComponentVertexValue.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/ConnectedComponentVertexValue.java new file mode 100644 index 0000000..85d3a2c --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/ConnectedComponentVertexValue.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.prepare_graph.vertex; + +import org.apache.giraph.block_app.library.ReusableSuppliers; +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Vertex value interface for connected component extraction + */ +public interface ConnectedComponentVertexValue extends Writable { + long getComponent(); + void setComponent(long component); + + /** + * If your vertex value class implements ConnectedComponentVertexValue, + * you can use this consumer for the UndirectedConnectedComponent Blocks. + */ + static + <I extends WritableComparable, V extends ConnectedComponentVertexValue, + E extends Writable> + ConsumerWithVertex<I, V, E, LongWritable> setComponentConsumer() { + return (vertex, value) -> vertex.getValue().setComponent(value.get()); + } + + /** + * If your vertex value class implements ConnectedComponentVertexValue, you + * can use this supplier for the UndirectedConnectedComponent Blocks. + */ + static + <I extends WritableComparable, V extends ConnectedComponentVertexValue, + E extends Writable> + SupplierFromVertex<I, V, E, LongWritable> getComponentSupplier() { + return ReusableSuppliers.fromLong( + (vertex) -> vertex.getValue().getComponent()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValue.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValue.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValue.java new file mode 100644 index 0000000..94f6836 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValue.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.prepare_graph.vertex; + +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; + +import org.apache.giraph.function.vertex.ConsumerWithVertex; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Vertex value interface for weakly connected component extraction + */ +public interface WeaklyConnectedComponentVertexValue + extends ConnectedComponentVertexValue { + void setEdgeIds(LongOpenHashSet edgeIds); + LongOpenHashSet takeEdgeIds(); + + /** + * If your vertex value class implements WeaklyConnectedComponentVertexValue, + * you can use these 4 suppliers and consumers to interact with this Piece. + */ + static + <I extends WritableComparable, V extends WeaklyConnectedComponentVertexValue, + E extends Writable> + ConsumerWithVertex<I, V, E, LongOpenHashSet> setEdgeIdsConsumer() { + return (vertex, edgeIds) -> vertex.getValue().setEdgeIds(edgeIds); + } + + /** + * If your vertex value class implements WeaklyConnectedComponentVertexValue, + * you can use these 4 suppliers and consumers to interact with this Piece. + */ + static + <I extends WritableComparable, V extends WeaklyConnectedComponentVertexValue, + E extends Writable> + SupplierFromVertex<I, V, E, LongOpenHashSet> getEdgeIdsSupplier() { + return (vertex) -> vertex.getValue().takeEdgeIds(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValueImpl.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValueImpl.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValueImpl.java new file mode 100644 index 0000000..8b1da12 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValueImpl.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.library.prepare_graph.vertex; + +import it.unimi.dsi.fastutil.longs.LongOpenHashSet; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * Implementation of WeaklyConnectedComponentVertexValue + */ +public class WeaklyConnectedComponentVertexValueImpl + implements WeaklyConnectedComponentVertexValue { + private long component; + private LongOpenHashSet edgeIds = null; + + @Override + public long getComponent() { + return component; + } + + @Override + public void setComponent(long component) { + this.component = component; + } + + @Override + public void setEdgeIds(LongOpenHashSet edgeIds) { + this.edgeIds = edgeIds; + } + + @Override + public LongOpenHashSet takeEdgeIds() { + LongOpenHashSet res = edgeIds; + edgeIds = null; + return res; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeLong(component); + if (edgeIds == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeInt(edgeIds.size()); + for (long longId: edgeIds) { + out.writeLong(longId); + } + } + } + + @Override + public void readFields(DataInput in) throws IOException { + component = in.readLong(); + boolean isEdgeIds = in.readBoolean(); + if (isEdgeIds) { + int friendsSize = in.readInt(); + edgeIds = new LongOpenHashSet(friendsSize); + for (int i = 0; i < friendsSize; i++) { + edgeIds.add(in.readLong()); + } + } else { + edgeIds = null; + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/package-info.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/package-info.java new file mode 100644 index 0000000..6987fa4 --- /dev/null +++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utilities for preparing and preprocessing the graph. + */ +package org.apache.giraph.block_app.library.prepare_graph.vertex;
