[GIRAPH-1013] Adding prepare graph library in Java8

Summary:
Adding simple graph preparation:
- symmetric
- removal of isolated edges
- normalizing
- connected components

Creating a new module, only used for Phadoop_facebook, which is written in 
Java8.

Tests are new/modified from what is in our repo, the rest is identical.

Test Plan: mvn clean install

Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov

Reviewed By: sergey.edunov

Differential Revision: https://reviews.facebook.net/D40719


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/d7e4bde1
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/d7e4bde1
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/d7e4bde1

Branch: refs/heads/trunk
Commit: d7e4bde11fdbfdef13e9316702c914e749a7757f
Parents: f9dc6b5
Author: Igor Kabiljo <[email protected]>
Authored: Thu Jun 25 14:13:18 2015 -0700
Committer: Igor Kabiljo <[email protected]>
Committed: Thu Jul 2 16:04:27 2015 -0700

----------------------------------------------------------------------
 checkstyle-relaxed-8.xml                        | 275 +++++++++++
 findbugs-exclude.xml                            |   2 +-
 giraph-block-app-8/pom.xml                      | 137 ++++++
 .../src/main/assembly/compile.xml               |  39 ++
 .../block_app/library/ReusableSuppliers.java    | 120 +++++
 .../giraph/block_app/library/package-info.java  |  21 +
 .../prepare_graph/PrepareGraphPieces.java       | 400 ++++++++++++++++
 .../UndirectedConnectedComponents.java          | 472 +++++++++++++++++++
 .../WeaklyConnectedComponents.java              | 163 +++++++
 .../library/prepare_graph/package-info.java     |  22 +
 .../vertex/ConnectedComponentVertexValue.java   |  56 +++
 .../WeaklyConnectedComponentVertexValue.java    |  57 +++
 ...WeaklyConnectedComponentVertexValueImpl.java |  84 ++++
 .../prepare_graph/vertex/package-info.java      |  21 +
 .../prepare_graph/TestConnectedComponents.java  | 192 ++++++++
 .../library/prepare_graph/TestPrepareGraph.java | 150 ++++++
 giraph-block-app/pom.xml                        |   4 +
 .../reducers/array/HugeArrayUtils.java          |  20 +-
 .../block_app/test_setup/NumericTestGraph.java  |   9 +-
 .../test_setup/graphs/Small1GraphInit.java      |  29 +-
 .../test_setup/graphs/Small2GraphInit.java      |  29 +-
 .../graphs/SmallDirectedForestGraphInit.java    |  76 +++
 .../graphs/SmallDirectedTreeGraphInit.java      |  75 +++
 .../giraph/object/MultiSizedReusable.java       | 113 +++++
 .../org/apache/giraph/object/package-info.java  |  21 +
 .../block_app/framework/BlockExecutionTest.java |  36 +-
 .../combiner/DoubleSumMessageCombiner.java      |   6 +-
 .../combiner/FloatSumMessageCombiner.java       |   6 +-
 .../apache/giraph/combiner/MessageCombiner.java |   6 +-
 .../combiner/MinimumDoubleMessageCombiner.java  |   3 +-
 .../combiner/MinimumIntMessageCombiner.java     |   2 +-
 .../combiner/SimpleSumMessageCombiner.java      |   5 +-
 .../giraph/combiner/SumMessageCombiner.java     |  73 +++
 .../giraph/reducers/impl/MaxPairReducer.java    |  87 ++++
 .../master/TestComputationCombinerTypes.java    |   2 +-
 .../apache/giraph/master/TestSwitchClasses.java |   4 +-
 .../giraph/vertex/TestComputationTypes.java     |   4 +-
 pom.xml                                         |  28 +-
 38 files changed, 2790 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/checkstyle-relaxed-8.xml
----------------------------------------------------------------------
diff --git a/checkstyle-relaxed-8.xml b/checkstyle-relaxed-8.xml
new file mode 100644
index 0000000..88a95cf
--- /dev/null
+++ b/checkstyle-relaxed-8.xml
@@ -0,0 +1,275 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!--
+  This version of checkstyle is based on the Hadoop and common-math
+  checkstyle configurations.  It is a best effort attempt to try to match
+  the CODE_CONVENTIONS and Oracle "Code Conventions for the Java
+  Programming Language".  See the following link:
+  
+  http://www.oracle.com/technetwork/java/codeconvtoc-136057.html
+
+  The documentation for checkstyle is available at 
+
+  http://checkstyle.sourceforge.net 
+-->
+
+<!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.1//EN" 
"http://www.puppycrawl.com/dtds/configuration_1_1.dtd";>
+
+<!-- Apache giraph customization of default Checkstyle behavior -->
+<module name="Checker">
+  <property name="localeLanguage" value="en"/>
+
+  <!-- Checks for headers -->
+  <!-- See http://checkstyle.sf.net/config_header.html -->
+    <!-- Verify that EVERY source file has the appropriate license -->
+  <module name="Header">
+    <property name="headerFile" value="${checkstyle.header.file}"/>
+    <property name="fileExtensions" value="java"/>       
+  </module>
+
+  <!-- Checks for Javadoc comments (checker).           -->
+  <!-- See http://checkstyle.sf.net/config_javadoc.html -->
+    <!-- Require package javadoc -->
+  <module name="JavadocPackage"/>
+
+  <!-- Miscellaneous other checks (checker).         -->
+  <!-- See http://checkstyle.sf.net/config_misc.html -->
+    <!-- Require files to end with newline characters -->
+  <module name="NewlineAtEndOfFile"/>
+
+  <!-- Checks for whitespace (tree walker)                 -->
+  <!-- See http://checkstyle.sf.net/config_whitespace.html -->
+    <!-- No tabs allowed! -->
+  <module name="FileTabCharacter"/>
+
+  <module name="TreeWalker">
+    <property name="cacheFile" value="target/checkstyle-cachefile"/>
+
+    <!-- Checks for blocks. You know, those {}'s         -->
+    <!-- See http://checkstyle.sf.net/config_blocks.html -->
+      <!-- No empty blocks (i.e. catch) -->
+    <module name="EmptyBlock"/>
+    <module name="AvoidNestedBlocks"/>
+      <!-- No if/else/do/for/while without braces -->
+    <module name="NeedBraces"/>
+    <module name="LeftCurly"/>
+    <module name="RightCurly"/>
+
+    <!-- Checks for class design                         -->
+    <!-- See http://checkstyle.sf.net/config_design.html -->
+      <!-- Utility class should not be instantiated, they must have a
+          private constructor -->
+    <module name="HideUtilityClassConstructor"/>
+      <!-- Interfaces must be types (not just constants) -->
+    <module name="InterfaceIsType"/>
+      <!-- No public fields -->
+    <module name="VisibilityModifier">
+       <property name="protectedAllowed" value="true"/>
+       <property name="publicMemberPattern" value="^$"/>
+    </module>
+
+    <!-- Checks for common coding problems               -->
+    <!-- See http://checkstyle.sf.net/config_coding.html -->
+    <module name="EmptyStatement"/>
+      <!-- Require hash code override when equals is -->
+    <module name="EqualsHashCode"/>
+      <!-- Method parameters and local variables should not hide
+          fields, except in constructors and setters -->
+    <module name="HiddenField">
+      <property name="ignoreConstructorParameter" value="true" />
+      <property name="ignoreSetter" value="true" />
+      <property name="tokens" value="VARIABLE_DEF"/>
+    </module>
+      <!-- Disallow unnecessary instantiation of Boolean, String -->
+    <module name="IllegalInstantiation">
+      <property name="classes" value="java.lang.Boolean, java.lang.String"/>
+    </module>
+    <module name="InnerAssignment"/>
+      <!-- Switch statements should be complete and with independent cases -->
+    <module name="FallThrough" />
+    <module name="MissingSwitchDefault" />
+    <module name="SimplifyBooleanExpression"/>
+    <module name="SimplifyBooleanReturn"/>
+      <!-- Only one statment per line allowed -->
+    <module name="OneStatementPerLine"/>
+      <!-- Use a consistent way to put declarations -->
+    <module name="DeclarationOrder" />
+      <!-- Don't add up parentheses when they are not required -->
+    <module name="UnnecessaryParentheses" />
+      <!-- Don't use too widespread catch (Exception, Throwable,
+          RuntimeException) --> 
+    <module name="IllegalCatch" />
+      <!-- Don't use = or != for string comparisons -->
+    <module name="StringLiteralEquality" />
+      <!-- Don't declare multiple variables in the same statement -->
+    <module name="MultipleVariableDeclarations" />
+      <!-- String literals more than one character long should not be 
+          repeated several times -->
+      <!-- the "unchecked" string is also accepted to allow
+          @SuppressWarnings("unchecked") -->
+      <!-- Disabling for now until we have a better ignoreStringsRegexp -->
+      <!--
+    <module name="MultipleStringLiterals" >
+      <property name="ignoreStringsRegexp" 
value='^(("")|(".")|("unchecked"))$'/>
+    </module>
+      -->
+
+    <!-- Checks for imports                              -->
+    <!-- See http://checkstyle.sf.net/config_import.html -->
+    <module name="RedundantImport"/>
+      <!-- Import should be explicit, really needed and only from pure
+          java packages --> 
+    <module name="AvoidStarImport" />
+    <module name="UnusedImports" />
+    <module name="IllegalImport" />
+
+    <!-- Checks for Javadoc comments (tree walker).       -->
+    <!-- See http://checkstyle.sf.net/config_javadoc.html -->
+      <!-- Javadoc must be formatted correctly -->
+    <module name="JavadocStyle">
+      <property name="checkFirstSentence" value="false"/>
+    </module>
+      <!-- Must have class / interface header comments -->
+    <module name="JavadocType"/>
+      <!-- Require method javadocs, allow undeclared RTE, allow missing 
+          javadoc on getters and setters  -->
+    <module name="JavadocMethod">
+      <property name="allowMissingJavadoc" value="true"/>
+      <property name="allowUndeclaredRTE" value="true"/>
+      <property name="allowMissingThrowsTags" value="true"/> 
+      <property name="allowMissingPropertyJavadoc" value="true"/>
+      <property name="allowMissingParamTags" value="true"/>
+      <property name="allowMissingReturnTag" value="true"/> 
+    </module>
+
+    <!-- Miscellaneous other checks (tree walker).     -->
+    <!-- See http://checkstyle.sf.net/config_misc.html -->
+      <!-- Java style arrays -->
+    <module name="ArrayTypeStyle"/>
+
+      <!-- Indentation --> 
+<!-- Ignore indentation before 
https://github.com/checkstyle/checkstyle/issues/281 is fixed
+    <module name="Indentation">
+      <property name="basicOffset" value="2"/>
+      <property name="braceAdjustment" value="0"/>
+      <property name="caseIndent" value="0"/>
+      <property name="throwsIndent" value="2"/>
+      <property name="lineWrappingIndentation" value="0"/>
+      <property name="arrayInitIndent" value="2"/>     
+    </module>
+-->
+      <!-- Turn this on to see what needs to be done
+    <module name="TodoComment"/>
+       -->
+    <module name="UpperEll"/>
+
+    <!-- Modifier Checks                                    -->
+    <!-- See http://checkstyle.sf.net/config_modifiers.html -->
+      <!-- Use a consistent way to put modifiers -->
+    <module name="ModifierOrder"/>
+    <module name="RedundantModifier"/>
+
+    <!-- Checks for Naming Conventions.                  -->
+    <!-- See http://checkstyle.sf.net/config_naming.html -->
+      <!-- Constant names should obey the traditional all uppercase 
+          naming convention -->
+    <module name="ConstantName"/>
+    <module name="LocalFinalVariableName"/>
+    <module name="LocalVariableName"/>
+    <module name="MemberName"/>
+    <module name="MethodName"/>
+    <module name="PackageName"/>
+    <module name="ParameterName"/>
+    <module name="StaticVariableName">
+      <property name="format" value="^[A-Z0-9_]*$"/>
+    </module>
+    <module name="TypeName"/>
+
+    <!-- Checks for regexp expressions.                  -->
+    <!-- See http://checkstyle.sf.net/config_regexp.html -->    
+      <!-- No trailing whitespace -->
+    <module name="Regexp">
+      <property name="format" value="[ \t]+$"/>
+      <property name="illegalPattern" value="true"/>
+      <property name="message" value="Trailing whitespace"/>
+    </module> 
+      <!-- No System.out.println() statements -->
+    <module name="Regexp">
+      <!-- No sysouts -->
+      <property name="format" value="System\.out\.println"/>
+      <property name="illegalPattern" value="true"/>
+    </module>
+      <!-- Authors should be in pom.xml file -->
+    <module name="Regexp">
+      <property name="format" value="@author"/>
+      <property name="illegalPattern" value="true"/>
+      <property name="message" value="developers names should be in pom file"/>
+    </module>
+
+    <!-- Checks for Size Violations.                    -->
+    <!-- See http://checkstyle.sf.net/config_sizes.html -->
+      <!-- Lines cannot exceed 80 chars -->
+    <module name="LineLength">
+      <property name="max" value="80"/>
+      <property name="ignorePattern" value="^import"/>
+    </module>
+      <!-- Over time, we will revised this down -->
+    <module name="MethodLength">
+      <property name="max" value="200"/>
+    </module>
+    <module name="ParameterNumber">
+      <property name="max" value="8"/>
+    </module>
+
+    <!-- Checks for whitespace (tree walker)                 -->
+    <!-- See http://checkstyle.sf.net/config_whitespace.html -->
+    <module name="EmptyForIteratorPad"/>
+      <!-- Spacing around methods -->
+    <module name="MethodParamPad">
+      <property name="option" value="nospace"/>
+      <property name="allowLineBreaks" value="true"/>
+     </module>
+      <!-- No whitespace before a token -->
+    <module name="NoWhitespaceBefore"/>
+      <!-- Whitespace after tokens is required -->
+    <module name="WhitespaceAfter"/>
+      <!-- Whitespace around tokens is required -->
+    <module name="WhitespaceAround"/>
+    <module name="ParenPad"/>
+    <module name="TypecastParenPad"/>
+      <!-- No extra whitespace around types -->
+    <module name="GenericWhitespace"/>
+    <!-- Operator must be at end of wrapped line -->
+    <module name="OperatorWrap">
+      <property name="option" value="eol"/>
+    </module>
+
+    <!-- Required for SuppressionCommentFilter below -->
+    <module name="FileContentsHolder"/>
+  </module>
+  
+  <!-- Setup special comments to suppress specific checks from source files -->
+  <module name="SuppressionCommentFilter">
+    <property name="offCommentFormat" value="CHECKSTYLE\: stop ([\w\|]+)"/>
+    <property name="onCommentFormat"  value="CHECKSTYLE\: resume ([\w\|]+)"/>
+    <property name="checkFormat"      value="$1"/>
+  </module>
+</module>
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml
index cc1a05a..afdf041 100644
--- a/findbugs-exclude.xml
+++ b/findbugs-exclude.xml
@@ -103,6 +103,6 @@
   <Match>
     <!-- Java Serialization is not used, so this is never an actual issue.
       On the other hand, Kryo needs lambdas to be Serializable to work. -->
-    <Bug pattern="SE_BAD_FIELD,SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
+    <Bug 
pattern="SE_BAD_FIELD,SE_COMPARATOR_SHOULD_BE_SERIALIZABLE,SE_NO_SERIALVERSIONID"
 />
   </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/pom.xml b/giraph-block-app-8/pom.xml
new file mode 100644
index 0000000..6645eed
--- /dev/null
+++ b/giraph-block-app-8/pom.xml
@@ -0,0 +1,137 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.giraph</groupId>
+    <artifactId>giraph-parent</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>giraph-block-app-8</artifactId>
+  <packaging>jar</packaging>
+
+  <name>Apache Giraph Blocks Framework in Java 8</name>
+  <url>http://giraph.apache.org/giraph-block-app/</url>
+  <description>Giraph Blocks Framework and utilities for writing applications 
in Java 8</description>
+
+  <properties>
+    <top.dir>${project.basedir}/..</top.dir>
+    
<checkstyle.config.path>${top.dir}/checkstyle-relaxed-8.xml</checkstyle.config.path>
+    <project.build.targetJdk>1.8</project.build.targetJdk>
+    <project.build.javaHome>${env.JAVA_8_HOME}</project.build.javaHome>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-javadoc-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>attach-javadocs</id>
+            <goals>
+              <goal>jar</goal>
+            </goals>
+            <configuration>
+              <additionalparam>-Xdoclint:none</additionalparam>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <configuration>
+          <siteDirectory>${project.basedir}/src/site</siteDirectory>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.6</version>
+        <configuration>
+          <skip>${surefire.skip}</skip>
+          <systemProperties>
+            <property>
+              <name>prop.jarLocation</name>
+              
<value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value>
+            </property>
+          </systemProperties>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <!-- compile dependencies. sorted lexicographically. -->
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.giraph</groupId>
+      <artifactId>giraph-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.giraph</groupId>
+      <artifactId>giraph-block-app</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+    </dependency>
+
+    <!-- runtime dependency -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>runtime</scope>
+    </dependency>
+
+    <!-- test dependencies. sorted lexicographically. -->
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/assembly/compile.xml
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/main/assembly/compile.xml 
b/giraph-block-app-8/src/main/assembly/compile.xml
new file mode 100644
index 0000000..fcaffa6
--- /dev/null
+++ b/giraph-block-app-8/src/main/assembly/compile.xml
@@ -0,0 +1,39 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0
 http://maven.apache.org/xsd/assembly-1.1.0.xsd";>
+  <id>jar-with-dependencies</id>
+   <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+
+  <dependencySets>
+    <dependencySet>
+      <useProjectArtifact>true</useProjectArtifact>
+      <outputDirectory>/</outputDirectory>
+      <unpackOptions>
+          <excludes>
+              <exclude>META-INF/LICENSE</exclude>
+          </excludes>
+      </unpackOptions>
+      <unpack>true</unpack>
+      <scope>runtime</scope>
+    </dependencySet>
+  </dependencySets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/ReusableSuppliers.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/ReusableSuppliers.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/ReusableSuppliers.java
new file mode 100644
index 0000000..cadbf2f
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/ReusableSuppliers.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library;
+
+import org.apache.giraph.function.primitive.Obj2DoubleFunction;
+import org.apache.giraph.function.primitive.Obj2FloatFunction;
+import org.apache.giraph.function.primitive.Obj2IntFunction;
+import org.apache.giraph.function.primitive.Obj2LongFunction;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * SupplierFromVertex that wrap other functions, providing common
+ * pattern for reusing objects, and minimizing GC overhead.
+ */
+public class ReusableSuppliers {
+  /** Hide constructor */
+  private ReusableSuppliers() { }
+
+  /**
+   * Transforms primitive long supplier into
+   * LongWritable supplier, with object being reused,
+   * to minimize GC overhead.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, LongWritable>
+  fromLong(Obj2LongFunction<Vertex<I, V, E>> supplier) {
+    LongWritable reusable = new LongWritable();
+    return (vertex) -> {
+      reusable.set(supplier.apply(vertex));
+      return reusable;
+    };
+  }
+
+  /**
+   * Transforms primitive int supplier into
+   * IntWritable supplier, with object being reused,
+   * to minimize GC overhead.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, IntWritable>
+  fromInt(Obj2IntFunction<Vertex<I, V, E>> supplier) {
+    IntWritable reusable = new IntWritable();
+    return (vertex) -> {
+      reusable.set(supplier.apply(vertex));
+      return reusable;
+    };
+  }
+
+  /**
+   * Transforms primitive float supplier into
+   * FloatWritable supplier, with object being reused,
+   * to minimize GC overhead.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, FloatWritable>
+  fromFloat(Obj2FloatFunction<Vertex<I, V, E>> supplier) {
+    FloatWritable reusable = new FloatWritable();
+    return (vertex) -> {
+      reusable.set(supplier.apply(vertex));
+      return reusable;
+    };
+  }
+
+  /**
+   * Transforms primitive double supplier into
+   * DoubleWritable supplier, with object being reused,
+   * to minimize GC overhead.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable>
+  SupplierFromVertex<I, V, E, DoubleWritable>
+  fromDouble(Obj2DoubleFunction<Vertex<I, V, E>> supplier) {
+    DoubleWritable reusable = new DoubleWritable();
+    return (vertex) -> {
+      reusable.set(supplier.apply(vertex));
+      return reusable;
+    };
+  }
+
+  /**
+   * Creates SupplierFromVertex, by passing reusable value into
+   * consumer argument, and returning same reusable every time.
+   */
+  public static
+  <I extends WritableComparable, V extends Writable, E extends Writable,
+  T extends Writable>
+  SupplierFromVertex<I, V, E, T> withReusable(
+      final T reusable, final ConsumerWithVertex<I, V, E, T> consumer) {
+    return (vertex) -> {
+      consumer.apply(vertex, reusable);
+      return reusable;
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/package-info.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/package-info.java
new file mode 100644
index 0000000..801150c
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Core library of Pieces and Suppliers, providing most common usages.
+ */
+package org.apache.giraph.block_app.library;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java
new file mode 100644
index 0000000..b4d40dc
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/PrepareGraphPieces.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.prepare_graph;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.delegate.DelegatePiece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.block_app.library.ReusableSuppliers;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.primitive.Int2ObjFunction;
+import org.apache.giraph.function.primitive.Obj2DoubleFunction;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.object.MultiSizedReusable;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.NoMessage;
+import org.apache.giraph.types.ops.NumericTypeOps;
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicSet;
+import org.apache.giraph.writable.tuple.PairWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility class for Pieces and Blocks that prepare graph by processing edges.
+ */
+@SuppressWarnings("rawtypes")
+public class PrepareGraphPieces {
+  private static final Logger LOG = Logger.getLogger(PrepareGraphPieces.class);
+
+  /** Hide constructor */
+  private PrepareGraphPieces() {
+  }
+
+  /**
+   * Cleans symmetric unweighted graph, by removing duplicate edges,
+   * adding opposite edges for asymetric edges, and removing standalone
+   * vertices
+   *
+   * @param idTypeOps Vertex id type ops
+   * @param <I> Vertex id type
+   * @return Block that cleans the graph
+   */
+  public static
+  <I extends WritableComparable>
+  Block cleanSymmetricUnweightedGraph(PrimitiveIdTypeOps<I> idTypeOps) {
+    return new SequenceBlock(
+        removeDuplicateEdges(idTypeOps),
+        makeSymmetricUnweighted(idTypeOps),
+        removeStandAloneVertices());
+  }
+
+  /**
+   * Creates a piece that removes edges that exist only in one direction.
+   * If edge exists in both directions, but with different edge value,
+   * both edges are left intact.
+   *
+   * @param idTypeOps Vertex id type ops
+   * @param <I> Vertex id type
+   * @return Piece that removes asymmetrical edges
+   */
+  public static
+  <I extends WritableComparable>
+  Block removeAsymEdges(PrimitiveIdTypeOps<I> idTypeOps) {
+    ConsumerWithVertex<I, Writable, Writable, Iterable<I>>
+    removeEdges = (vertex, neighbors) -> {
+      BasicSet<I> set = idTypeOps.createOpenHashSet();
+      for (I message : neighbors) {
+        set.add(message);
+      }
+
+      for (Iterator<MutableEdge<I, Writable>> iter =
+              vertex.getMutableEdges().iterator();
+          iter.hasNext();) {
+        MutableEdge<I, Writable> edge = iter.next();
+        if (!set.contains(edge.getTargetVertexId())) {
+          iter.remove();
+        }
+      }
+    };
+    return Pieces.sendMessageToNeighbors(
+        "RemoveAsymEdges",
+        idTypeOps.getTypeClass(),
+        VertexSuppliers.vertexIdSupplier(),
+        removeEdges);
+  }
+
+  /**
+   * Remove duplicate edges, for each vertex only leaving
+   * instance of first outgoing edge to each target vertex.
+   *
+   * If graph is weighted, you might want to prefer summing the
+   * weights, instead of removing later occurrences.
+   */
+  public static <I extends WritableComparable>
+  Block removeDuplicateEdges(PrimitiveIdTypeOps<I> idTypeOps) {
+    Int2ObjFunction<BasicSet<I>> reusableSets =
+        MultiSizedReusable.createForBasicSet(idTypeOps);
+    return Pieces.<I, Writable, Writable>forAllVertices(
+        "RemoveDuplicateEdges",
+        (vertex) -> {
+          BasicSet<I> set = reusableSets.apply(vertex.getNumEdges());
+          for (Iterator<MutableEdge<I, Writable>> iter =
+                vertex.getMutableEdges().iterator();
+              iter.hasNext();) {
+            MutableEdge<I, Writable> edge = iter.next();
+            if (!set.add(edge.getTargetVertexId())) {
+              iter.remove();
+            }
+          }
+        });
+  }
+
+  /**
+   * Creates a piece that will take an unweighted graph (graph with edge value
+   * being NullWritable), and make it symmetric, by creating all edges
+   * that are not present and opposite edge exists.
+   *
+   * @param idTypeOps Vertex id type ops
+   * @return Piece that makes unweighted graph symmetric
+   */
+  public static <I extends WritableComparable>
+  Block makeSymmetricUnweighted(PrimitiveIdTypeOps<I> idTypeOps) {
+    Int2ObjFunction<BasicSet<I>> reusableSets =
+        MultiSizedReusable.createForBasicSet(idTypeOps);
+    ConsumerWithVertex<I, Writable, NullWritable, Iterable<I>>
+    addEdges = (vertex, neighbors) -> {
+      BasicSet<I> set = reusableSets.apply(vertex.getNumEdges());
+
+      for (Edge<I, NullWritable> edge : vertex.getEdges()) {
+        set.add(edge.getTargetVertexId());
+      }
+      for (I neighbor : neighbors) {
+        if (!set.contains(neighbor)) {
+          Edge<I, NullWritable> edge =
+              EdgeFactory.create(idTypeOps.createCopy(neighbor));
+          vertex.addEdge(edge);
+          set.add(neighbor);
+        }
+      }
+    };
+    return Pieces.sendMessageToNeighbors(
+        "MakeSymmetricUnweighted",
+        idTypeOps.getTypeClass(),
+        VertexSuppliers.vertexIdSupplier(),
+        addEdges);
+  }
+
+  /**
+   * Make weighted graph symmetric - by making edge weight in both directions
+   * equal to the sum of weights in both directions.
+   *
+   * This means if graph is already symmetric - resulting graph will have
+   * doubled weights.
+   * We are not taking average, in order to work with integer numbers - because
+   * division by two might not be integer number.
+   */
+  public static <I extends WritableComparable, V extends Writable,
+    E extends Writable>
+  Block makeSymmetricWeighted(
+      PrimitiveIdTypeOps<I> idTypeOps, NumericTypeOps<E> edgeTypeOps) {
+    MessageValueFactory<PairWritable<I, E>> messageFactory =
+        () -> new PairWritable<>(idTypeOps.create(), edgeTypeOps.create());
+    return new Piece<I, V, E, PairWritable<I, E>, Object>() {
+      @Override
+      public VertexSender<I, V, E> getVertexSender(
+          BlockWorkerSendApi<I, V, E, PairWritable<I, E>> workerApi,
+          Object executionStage) {
+        PairWritable<I, E> message = messageFactory.newInstance();
+        return (vertex) -> {
+          idTypeOps.set(message.getLeft(), vertex.getId());
+          for (Edge<I, E> edge : vertex.getEdges()) {
+            edgeTypeOps.set(message.getRight(), edge.getValue());
+            workerApi.sendMessage(edge.getTargetVertexId(), message);
+          }
+        };
+      }
+
+      @Override
+      public
+      VertexReceiver<I, V, E, PairWritable<I, E>> getVertexReceiver(
+          BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+        // TODO: After t5921368, make edges also primitive
+        ReusableEdge<I, E> reusableEdge =
+            EdgeFactory.createReusable(null, null);
+        return (vertex, messages) -> {
+          Basic2ObjectMap<I, E> map =
+              idTypeOps.create2ObjectOpenHashMap(null);
+          for (PairWritable<I, E> message : messages) {
+            if (map.containsKey(message.getLeft())) {
+              edgeTypeOps.plusInto(
+                  map.get(message.getLeft()), message.getRight());
+            } else {
+              // create edge copy, since it is not stored as primitive yet
+              map.put(message.getLeft(),
+                  edgeTypeOps.createCopy(message.getRight()));
+            }
+          }
+
+          for (MutableEdge<I, E> edge : vertex.getMutableEdges()) {
+            E receivedWeight = map.remove(edge.getTargetVertexId());
+            if (receivedWeight != null) {
+              edgeTypeOps.plusInto(edge.getValue(), receivedWeight);
+              edge.setValue(edge.getValue());
+            }
+          }
+
+          // TODO: add foreach, or entry iterator to Basic Maps
+          for (Iterator<I> iter = map.fastKeyIterator(); iter.hasNext();) {
+            I neighbor = iter.next();
+            reusableEdge.setTargetVertexId(neighbor);
+            reusableEdge.setValue(map.get(neighbor));
+            vertex.addEdge(reusableEdge);
+          }
+        };
+      }
+
+      @Override
+      protected MessageValueFactory<PairWritable<I, E>> getMessageFactory(
+          ImmutableClassesGiraphConfiguration conf) {
+        return messageFactory;
+      }
+
+      @Override
+      public String toString() {
+        return "MakeSymmetricWeighted";
+      }
+    };
+  }
+
+
+  /**
+   * Removing vertices from the graph takes effect in the next superstep.
+   * Adding an empty piece is to prevent calling the next send inner piece
+   * before vertices are being actually removed.
+   */
+  public static Block removeStandAloneVertices() {
+    return Pieces.removeVertices(
+        "RemoveStandaloneVertices",
+        (vertex) -> vertex.getNumEdges() == 0);
+  }
+
+  public static Block normalizeDoubleEdges() {
+    ObjectTransfer<DoubleWritable> sumEdgeWeights =
+        new ObjectTransfer<>();
+    ObjectTransfer<LongWritable> countEdges = new ObjectTransfer<>();
+
+    return new DelegatePiece<>(
+        calcSumEdgesPiece(DoubleWritable::get, sumEdgeWeights),
+        countTotalEdgesPiece(countEdges),
+        new Piece<WritableComparable, Writable, DoubleWritable, NoMessage,
+            Object>() {
+          private double averageEdgeWeight;
+
+          @Override
+          public void masterCompute(
+              BlockMasterApi master, Object executionStage) {
+            averageEdgeWeight =
+                sumEdgeWeights.get().get() / countEdges.get().get();
+            LOG.info("Averge edge weight " + averageEdgeWeight);
+          }
+
+          @Override
+          public VertexReceiver<WritableComparable, Writable, DoubleWritable,
+          NoMessage> getVertexReceiver(
+              BlockWorkerReceiveApi<WritableComparable> workerApi,
+              Object executionStage) {
+            DoubleWritable doubleWritable = new DoubleWritable();
+            return (vertex, messages) -> {
+              for (MutableEdge<WritableComparable, DoubleWritable> edge :
+                  vertex.getMutableEdges()) {
+                doubleWritable.set(
+                    (float) (edge.getValue().get() / averageEdgeWeight));
+                edge.setValue(doubleWritable);
+              }
+            };
+          }
+
+          @Override
+          public String toString() {
+            return "NormalizeDoubleEdges";
+          }
+        });
+  }
+
+  public static Block normalizeFloatEdges() {
+    ObjectTransfer<DoubleWritable> sumEdgeWeights =
+        new ObjectTransfer<>();
+    ObjectTransfer<LongWritable> countEdges = new ObjectTransfer<>();
+
+    return new DelegatePiece<>(
+        calcSumEdgesPiece(FloatWritable::get, sumEdgeWeights),
+        countTotalEdgesPiece(countEdges),
+        new Piece<WritableComparable, Writable, FloatWritable, NoMessage,
+            Object>() {
+          private double averageEdgeWeight;
+
+          @Override
+          public void masterCompute(
+              BlockMasterApi master, Object executionStage) {
+            averageEdgeWeight =
+                sumEdgeWeights.get().get() / countEdges.get().get();
+            LOG.info("Averge edge weight " + averageEdgeWeight);
+          }
+
+          @Override
+          public VertexReceiver<WritableComparable, Writable, FloatWritable,
+          NoMessage> getVertexReceiver(
+              BlockWorkerReceiveApi<WritableComparable> workerApi,
+              Object executionStage) {
+            FloatWritable floatWritable = new FloatWritable();
+            return (vertex, messages) -> {
+              for (MutableEdge<WritableComparable, FloatWritable> edge :
+                  vertex.getMutableEdges()) {
+                floatWritable.set(
+                    (float) (edge.getValue().get() / averageEdgeWeight));
+                edge.setValue(floatWritable);
+              }
+            };
+          }
+
+          @Override
+          public String toString() {
+            return "NormalizeFloatEdges";
+          }
+        });
+  }
+
+  /**
+   * Piece that calculates total number of edges,
+   * and gives result to the {@code countEdges} consumer.
+   */
+  public static
+  Piece<WritableComparable, Writable, Writable, NoMessage, Object>
+  countTotalEdgesPiece(Consumer<LongWritable> countEdges) {
+    return Pieces.reduce(
+      "CountTotalEdgesPiece",
+      SumReduce.LONG,
+      ReusableSuppliers.fromLong((vertex) -> vertex.getNumEdges()),
+      countEdges);
+  }
+
+  /**
+   * Piece that calculates total edge weight in the graph,
+   * and gives result to the {@code sumEdgeWeights} consumer.
+   */
+  public static <E extends Writable>
+  Piece<WritableComparable, Writable, E, NoMessage, Object> calcSumEdgesPiece(
+      Obj2DoubleFunction<E> edgeValueF,
+      Consumer<DoubleWritable> sumEdgeWeights) {
+    return Pieces.reduce(
+      "CalcSumEdgesPiece",
+      SumReduce.DOUBLE,
+      ReusableSuppliers.fromDouble((vertex) -> {
+        double sum = 0;
+        for (Edge<WritableComparable, E> edge : vertex.getEdges()) {
+          sum += edgeValueF.apply(edge.getValue());
+        }
+        return sum;
+      }),
+      sumEdgeWeights);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
new file mode 100644
index 0000000..2610436
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.prepare_graph;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.giraph.block_app.framework.api.BlockMasterApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.api.CreateReducersApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatUntilBlock;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.block_app.library.SendMessageChain;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import org.apache.giraph.block_app.reducers.map.BasicMapReduce;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.combiner.SumMessageCombiner;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.function.Consumer;
+import org.apache.giraph.function.ObjectTransfer;
+import org.apache.giraph.function.Supplier;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.reducers.impl.MaxPairReducer;
+import org.apache.giraph.reducers.impl.SumReduce;
+import org.apache.giraph.types.NoMessage;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.TypeOps;
+import org.apache.giraph.writable.tuple.LongLongWritable;
+import org.apache.giraph.writable.tuple.PairWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * Class for creating utility blocks for calculating and processing
+ * connected components.
+ *
+ * Graph is expected to be symmetric before calling any of the methods here.
+ */
+public class UndirectedConnectedComponents {
+  private static final Logger LOG =
+      Logger.getLogger(UndirectedConnectedComponents.class);
+
+  private UndirectedConnectedComponents() { }
+
+  /** Initialize vertex values for connected components calculation */
+  private static <I extends WritableComparable, V extends Writable>
+  Piece<I, V, Writable, NoMessage, Object> createInitializePiece(
+      TypeOps<I> idTypeOps,
+      Consumer<Boolean> vertexUpdatedComponent,
+      ConsumerWithVertex<I, V, Writable, I> setComponent,
+      SupplierFromVertex<I, V, Writable,
+        ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier) {
+    I result = idTypeOps.create();
+    return Pieces.forAllVerticesOnReceive("InitializeCC", (vertex) -> {
+      idTypeOps.set(result, vertex.getId());
+      boolean updated = false;
+      for (Edge<I, ?> edge : edgeSupplier.get(vertex)) {
+        if (result.compareTo(edge.getTargetVertexId()) > 0) {
+          idTypeOps.set(result, edge.getTargetVertexId());
+          updated = true;
+        }
+      }
+      setComponent.apply(vertex, result);
+      vertexUpdatedComponent.apply(updated);
+    });
+  }
+
+  /** Propagate connected components to neighbor pieces */
+  private static class PropagateConnectedComponentsPiece
+      <I extends WritableComparable, V extends Writable>
+      extends Piece<I, V, Writable, I, Object> {
+    private final TypeOps<I> idTypeOps;
+    private final Supplier<Boolean> vertexToPropagate;
+    private final Consumer<Boolean> vertexUpdatedComponent;
+    private final Consumer<Boolean> converged;
+    private final SupplierFromVertex<I, V, Writable, I> getComponent;
+    private final ConsumerWithVertex<I, V, Writable, I> setComponent;
+    private final SupplierFromVertex<I, V, Writable,
+        ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier;
+
+    private ReducerHandle<LongWritable, LongWritable> propagatedAggregator;
+
+    PropagateConnectedComponentsPiece(
+        TypeOps<I> idTypeOps,
+        Supplier<Boolean> vertexToPropagate,
+        Consumer<Boolean> vertexUpdatedComponent,
+        Consumer<Boolean> converged,
+        SupplierFromVertex<I, V, Writable, I> getComponent,
+        ConsumerWithVertex<I, V, Writable, I> setComponent,
+        SupplierFromVertex<I, V, Writable,
+          ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier) {
+      this.idTypeOps = idTypeOps;
+      this.vertexToPropagate = vertexToPropagate;
+      this.vertexUpdatedComponent = vertexUpdatedComponent;
+      this.converged = converged;
+      this.getComponent = getComponent;
+      this.setComponent = setComponent;
+      this.edgeSupplier = edgeSupplier;
+    }
+
+    @Override
+    public void registerReducers(
+        CreateReducersApi reduceApi, Object executionStage) {
+      propagatedAggregator = reduceApi.createLocalReducer(SumReduce.LONG);
+    }
+
+    @Override
+    public VertexSender<I, V, Writable>  getVertexSender(
+        final BlockWorkerSendApi<I, V, Writable, I> workerApi,
+        Object executionStage) {
+      final LongWritable one = new LongWritable(1);
+      return new InnerVertexSender() {
+        @Override
+        public void vertexSend(Vertex<I, V, Writable> vertex) {
+          if (vertexToPropagate.get()) {
+            workerApi.sendMessageToMultipleEdges(
+                Iterators.transform(
+                    edgeSupplier.get(vertex).iterator(),
+                    new Function<Edge<I, ?>, I>() {
+                      @Override
+                      public I apply(Edge<I, ?> edge) {
+                        return edge.getTargetVertexId();
+                      }
+                    }),
+                getComponent.get(vertex));
+            propagatedAggregator.reduce(one);
+          }
+        }
+      };
+    }
+
+    @Override
+    public void masterCompute(BlockMasterApi master, Object executionStage) {
+      converged.apply(propagatedAggregator.getReducedValue(master).get() == 0);
+      LOG.info("Undirected CC: " +
+          propagatedAggregator.getReducedValue(master).get() +
+          " many vertices sent in this iteration");
+    }
+
+    @Override
+    public VertexReceiver<I, V, Writable, I> getVertexReceiver(
+        BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+      return new InnerVertexReceiver() {
+        private final I received = idTypeOps.create();
+
+        @Override
+        public void vertexReceive(Vertex<I, V, Writable> vertex,
+            Iterable<I> messages) {
+          boolean first = true;
+          for (I value : messages) {
+            if (first) {
+              idTypeOps.set(received, value);
+              first = false;
+            } else {
+              if (received.compareTo(value) > 0) {
+                idTypeOps.set(received, value);
+              }
+            }
+          }
+
+          I cur = getComponent.get(vertex);
+          if (!first && cur.compareTo(received) > 0) {
+            setComponent.apply(vertex, received);
+            vertexUpdatedComponent.apply(true);
+          } else {
+            vertexUpdatedComponent.apply(false);
+          }
+        }
+      };
+    }
+
+    @Override
+    public Class<I> getMessageClass() {
+      return idTypeOps.getTypeClass();
+    }
+  }
+
+  /** Calculates number of components, and the number of active vertices */
+  public static final class CalculateNumberOfComponents<V extends Writable>
+      extends Piece<LongWritable, V, Writable, LongWritable, Object> {
+    private final Consumer<LongWritable> numActiveConsumer;
+    private final Consumer<LongWritable> numComponentsConsumer;
+    private final
+    SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent;
+
+    private ReducerHandle<LongWritable, LongWritable> numComponentsAggregator;
+    private ReducerHandle<LongWritable, LongWritable> numActiveAggregator;
+
+    public CalculateNumberOfComponents(
+        Consumer<LongWritable> numActiveConsumer,
+        Consumer<LongWritable> numComponentsConsumer,
+        SupplierFromVertex<LongWritable, V, Writable, LongWritable>
+          getComponent) {
+      this.numActiveConsumer = numActiveConsumer;
+      this.numComponentsConsumer = numComponentsConsumer;
+      this.getComponent = getComponent;
+    }
+
+    @Override
+    public void registerReducers(
+        CreateReducersApi reduceApi, Object executionStage) {
+      numComponentsAggregator = reduceApi.createLocalReducer(SumReduce.LONG);
+      numActiveAggregator = reduceApi.createLocalReducer(SumReduce.LONG);
+    }
+
+    @Override
+    public VertexSender<LongWritable, V, Writable> getVertexSender(
+        BlockWorkerSendApi<LongWritable, V, Writable, LongWritable> workerApi,
+        Object executionStage) {
+      final LongWritable one = new LongWritable(1);
+      return new InnerVertexSender() {
+        @Override
+        public void vertexSend(Vertex<LongWritable, V, Writable> vertex) {
+          numActiveAggregator.reduce(one);
+         // Only aggregate if you are the minimum of your CC
+          if (vertex.getId().get() == getComponent.get(vertex).get()) {
+            numComponentsAggregator.reduce(one);
+          }
+        }
+      };
+    }
+
+    @Override
+    public void masterCompute(BlockMasterApi master, Object executionStage) {
+      numActiveConsumer.apply(numActiveAggregator.getReducedValue(master));
+      numComponentsConsumer.apply(
+          numComponentsAggregator.getReducedValue(master));
+      LOG.info("Num active is : " +
+          numActiveAggregator.getReducedValue(master));
+      LOG.info("Num components is : " +
+          numComponentsAggregator.getReducedValue(master));
+    }
+
+    @Override
+    protected
+    MessageCombiner<? super LongWritable, LongWritable> getMessageCombiner(
+        ImmutableClassesGiraphConfiguration conf) {
+      return SumMessageCombiner.LONG;
+    }
+  }
+
+  /**
+   * Calculate connected components, doing as many iterations as needed,
+   * but no more than maxIterations.
+   *
+   * Graph is expected to be symmetric.
+   */
+  public static <I extends WritableComparable, V extends Writable>
+  Block calculateConnectedComponents(
+      int maxIterations,
+      TypeOps<I> idTypeOps,
+      final SupplierFromVertex<I, V, Writable, I> getComponent,
+      final ConsumerWithVertex<I, V, Writable, I> setComponent,
+      SupplierFromVertex<I, V, Writable,
+        ? extends Iterable<? extends Edge<I, ?>>> edgeSupplier) {
+    ObjectTransfer<Boolean> converged = new ObjectTransfer<>();
+    ObjectTransfer<Boolean> vertexUpdatedComponent = new ObjectTransfer<>();
+
+    return new SequenceBlock(
+      createInitializePiece(
+          idTypeOps,
+          vertexUpdatedComponent,
+          setComponent,
+          edgeSupplier),
+      new RepeatUntilBlock(
+        maxIterations,
+        new PropagateConnectedComponentsPiece<>(
+          idTypeOps,
+          vertexUpdatedComponent,
+          vertexUpdatedComponent,
+          converged, getComponent, setComponent,
+          edgeSupplier
+        ),
+        converged
+      )
+    );
+  }
+
+  /**
+   * Default block, which calculates connected components using the
+   * vertex's default edges.
+   */
+  public static <I extends WritableComparable, V extends Writable>
+  Block calculateConnectedComponents(
+      int maxIterations,
+      TypeOps<I> idTypeOps,
+      final SupplierFromVertex<I, V, Writable, I> getComponent,
+      final ConsumerWithVertex<I, V, Writable, I> setComponent) {
+    return calculateConnectedComponents(
+        maxIterations,
+        idTypeOps,
+        getComponent,
+        setComponent,
+        VertexSuppliers.vertexEdgesSupplier());
+  }
+
+  public static <V extends Writable>
+  Block calculateConnectedComponents(
+      int maxIterations,
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent
+  ) {
+    return calculateConnectedComponents(
+            maxIterations,
+            LongTypeOps.INSTANCE,
+            getComponent,
+            setComponent,
+            VertexSuppliers.vertexEdgesSupplier());
+  }
+
+  /**
+   * Calculates sizes of all components by aggregating on master, and allows
+   * each vertex to consume its size. Differs from CalculateComponentSizesPiece
+   * in that aggregation happens on master, instead of message sends to the
+   * component_id.
+   */
+  public static <V extends Writable>
+  Block calculateConnectedComponentSizes(
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      ConsumerWithVertex<LongWritable, V, Writable, LongWritable> sizeConsumer
+  ) {
+    Pair<LongWritable, LongWritable> componentToReducePair = Pair.of(
+        new LongWritable(), new LongWritable(1));
+    LongWritable reusableLong = new LongWritable();
+    return Pieces.reduceAndBroadcast(
+        "CalcConnectedComponentSizes",
+        new BasicMapReduce<>(
+            LongTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG),
+        (Vertex<LongWritable, V, Writable> vertex) -> {
+          componentToReducePair.getLeft().set(getComponent.get(vertex).get());
+          return componentToReducePair;
+        },
+        (vertex, componentSizes) -> {
+          long compSize = componentSizes.get(getComponent.get(vertex)).get();
+          reusableLong.set(compSize);
+          sizeConsumer.apply(vertex, reusableLong);
+        });
+  }
+
+  /**
+   * Given a graph with already calculated connected components - calculates
+   * ID of the largest one.
+   */
+  public static <V extends Writable>
+  Block calculateLargestConnectedComponentStats(
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      Consumer<PairWritable<LongWritable, LongWritable>>
+        largestComponentConsumer) {
+    LongWritable one = new LongWritable(1);
+    LongLongWritable pair = new LongLongWritable();
+    return SendMessageChain.<LongWritable, V, Writable, LongWritable>startSend(
+        "CalcComponentSizesPiece",
+        SumMessageCombiner.LONG,
+        (vertex) -> one,
+        (vertex) -> Iterators.singletonIterator(getComponent.get(vertex))
+    ).endReduce(
+        "CalcLargestComponent",
+        new MaxPairReducer<>(LongTypeOps.INSTANCE, LongTypeOps.INSTANCE),
+        (vertex, message) -> {
+          long curSum = message != null ? message.get() : 0;
+          pair.getLeft().set(getComponent.get(vertex).get());
+          pair.getRight().set(curSum);
+          return pair;
+        },
+        largestComponentConsumer);
+  }
+
+  /**
+   * Given a graph with already calculated connected components - calculates
+   * ID of the largest one.
+   */
+  public static <V extends Writable> Block calculateLargestConnectedComponent(
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      LongWritable largestComponent) {
+    return calculateLargestConnectedComponentStats(
+        getComponent,
+        (t) -> largestComponent.set(t.getLeft().get())
+    );
+  }
+
+  /**
+   * Given a graph with already calculated connected components - calculates
+   * size of the largest one.
+   */
+  public static <V extends Writable>
+  Block calculateLargestConnectedComponentSize(
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      LongWritable largestComponentSize) {
+    return calculateLargestConnectedComponentStats(
+        getComponent,
+        (t) -> largestComponentSize.set(t.getRight().get()));
+  }
+
+  /**
+   * Takes symmetric graph, and removes all edges/vertices that
+   * are not in largest connected component.
+   */
+  public static <V extends Writable> Block calculateAndKeepLargestComponent(
+      int maxIterations,
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent
+  ) {
+    final LongWritable largestComponent = new LongWritable();
+    return new SequenceBlock(
+      calculateConnectedComponents(
+          maxIterations, LongTypeOps.INSTANCE, getComponent, setComponent,
+          VertexSuppliers.vertexEdgesSupplier()),
+      calculateLargestConnectedComponent(getComponent, largestComponent),
+      Pieces.<LongWritable, V, Writable>removeVertices(
+          "KeepOnlyLargestComponent",
+          (vertex) -> !largestComponent.equals(getComponent.get(vertex))));
+  }
+
+
+  /**
+   * Takes symmetric graph, and removes all edges/vertices that
+   * are belong to connected components smaller than specified
+   * threshold
+   */
+  public static <V extends Writable>
+  Block calculateAndKeepComponentAboveThreshold(
+      int maxIterations,
+      int threshold,
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent
+  ) {
+    final ObjectTransfer<Boolean> belowThreshold = new ObjectTransfer<>();
+    return new SequenceBlock(
+        UndirectedConnectedComponents.calculateConnectedComponents(
+            maxIterations,
+            LongTypeOps.INSTANCE,
+            getComponent,
+            setComponent
+        ),
+        UndirectedConnectedComponents.calculateConnectedComponentSizes(
+            getComponent,
+            (vertex, value) -> {
+              belowThreshold.apply(value.get() < threshold);
+            }),
+        Pieces.removeVertices(
+            "KeepAboveTresholdComponents",
+            belowThreshold.castToSupplier()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/WeaklyConnectedComponents.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/WeaklyConnectedComponents.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/WeaklyConnectedComponents.java
new file mode 100644
index 0000000..9070f75
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/WeaklyConnectedComponents.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.prepare_graph;
+
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+
+import java.util.Iterator;
+
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.SequenceBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.library.Pieces;
+import org.apache.giraph.block_app.library.VertexSuppliers;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.edge.MutableEdge;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Class for computing the weakly connected components of a directed graph.
+ *
+ * If the graph is undirected, this has the exact same behavior as
+ * UndirectedConnectedComponents.
+ *
+ * This currently does not support weighted directed graphs.
+ */
+public class WeaklyConnectedComponents {
+  private WeaklyConnectedComponents() { }
+
+  /** Save existing edges */
+  private static <V extends Writable> Block saveDirectedEdges(
+      ConsumerWithVertex<LongWritable, V, Writable, LongOpenHashSet> setEdges) 
{
+    return Pieces.<LongWritable, V, Writable>forAllVertices(
+        "SaveDirectedEdgesPiece",
+        (vertex) -> {
+          LongOpenHashSet friendids = new LongOpenHashSet();
+          for (Edge<LongWritable, Writable> edge : vertex.getEdges()) {
+            friendids.add(edge.getTargetVertexId().get());
+          }
+          setEdges.apply(vertex, friendids);
+        });
+  }
+
+  /** Restore existing edges */
+  private static <V extends Writable> Block restoreDirectedEdges(
+      SupplierFromVertex<LongWritable, V, Writable, LongOpenHashSet> getEdges) 
{
+    return Pieces.<LongWritable, V, Writable>forAllVertices(
+        "RestoreDirectedEdgesPiece",
+        (vertex) -> {
+          LongOpenHashSet currentList = getEdges.get(vertex);
+          Iterator<MutableEdge<LongWritable, Writable>> iter =
+              vertex.getMutableEdges().iterator();
+          while (iter.hasNext()) {
+            MutableEdge<LongWritable, Writable> edge = iter.next();
+            if (!currentList.contains(edge.getTargetVertexId().get())) {
+              iter.remove();
+            }
+          }
+        });
+  }
+
+  /**
+   * Calculate connected components, doing as many iterations as needed,
+   * but no more than maxIterations.
+   *
+   * Graph is expected to be symmetric.
+   */
+  public static <V extends Writable> Block calculateConnectedComponents(
+      int maxIterations,
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent,
+      SupplierFromVertex<LongWritable, V, Writable, LongOpenHashSet> getEdges,
+      ConsumerWithVertex<LongWritable, V, Writable, LongOpenHashSet> setEdges,
+      boolean useFloatWeights) {
+    return new SequenceBlock(
+        saveDirectedEdges(setEdges),
+        makeSymmetric(useFloatWeights),
+        UndirectedConnectedComponents.calculateConnectedComponents(
+            maxIterations, LongTypeOps.INSTANCE, getComponent, setComponent),
+        restoreDirectedEdges(getEdges));
+  }
+
+  /**
+   * Takes unweighted directed graph, and removes all edges/vertices that
+   * are not in largest weakly connected component.
+   */
+  public static <V extends Writable> Block calculateAndKeepLargestComponent(
+      int maxIterations,
+      SupplierFromVertex<LongWritable, V, Writable, LongWritable> getComponent,
+      ConsumerWithVertex<LongWritable, V, Writable, LongWritable> setComponent,
+      SupplierFromVertex<LongWritable, V, Writable, LongOpenHashSet> getEdges,
+      ConsumerWithVertex<LongWritable, V, Writable, LongOpenHashSet> setEdges,
+      boolean useFloatWeights) {
+    return new SequenceBlock(
+        saveDirectedEdges(setEdges),
+        makeSymmetric(useFloatWeights),
+        UndirectedConnectedComponents.calculateAndKeepLargestComponent(
+            maxIterations, getComponent, setComponent),
+        restoreDirectedEdges(getEdges));
+  }
+
+  private static Block makeSymmetric(boolean useFloatWeights) {
+    if (!useFloatWeights) {
+      return PrepareGraphPieces.makeSymmetricUnweighted(LongTypeOps.INSTANCE);
+    } else {
+      return makeSymmetricFloatWeighted();
+    }
+  }
+
+  /**
+   * This is just used internally in weakly connected components if the graph
+   * has float weights. This works fine here because we don't actually ever
+   * use the weights, but should not be used outside of the
+   * WeaklyConnectedComponents class and hence is private.
+   */
+  private static <V extends Writable>
+  Piece<LongWritable, V, FloatWritable, LongWritable, Object>
+  makeSymmetricFloatWeighted() {
+    LongSet set = new LongOpenHashSet();
+    FloatWritable floatWritable = new FloatWritable(1.0f);
+    ConsumerWithVertex<LongWritable, V, FloatWritable, Iterable<LongWritable>>
+    addEdges = (vertex, neighbors) -> {
+      set.clear();
+      for (Edge<LongWritable, FloatWritable> edge : vertex.getEdges()) {
+        set.add(edge.getTargetVertexId().get());
+      }
+      for (LongWritable message : neighbors) {
+        if (!set.contains(message.get())) {
+          Edge<LongWritable, FloatWritable> edge = EdgeFactory.create(
+              new LongWritable(message.get()), floatWritable);
+          vertex.addEdge(edge);
+          set.add(message.get());
+        }
+      }
+    };
+    return Pieces.sendMessageToNeighbors(
+        "MakeSymmetricFloatWeighted",
+        LongWritable.class,
+        VertexSuppliers.<LongWritable, V, FloatWritable>vertexIdSupplier(),
+        addEdges);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/package-info.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/package-info.java
new file mode 100644
index 0000000..d3c263a
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Vertex Value interfaces and implementations
+ *  for preparing and preprocessing the graph.
+ */
+package org.apache.giraph.block_app.library.prepare_graph;

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/ConnectedComponentVertexValue.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/ConnectedComponentVertexValue.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/ConnectedComponentVertexValue.java
new file mode 100644
index 0000000..85d3a2c
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/ConnectedComponentVertexValue.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.prepare_graph.vertex;
+
+import org.apache.giraph.block_app.library.ReusableSuppliers;
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Vertex value interface for connected component extraction
+ */
+public interface ConnectedComponentVertexValue extends Writable {
+  long getComponent();
+  void setComponent(long component);
+
+  /**
+   * If your vertex value class implements ConnectedComponentVertexValue,
+   * you can use this consumer for the UndirectedConnectedComponent Blocks.
+   */
+  static
+  <I extends WritableComparable, V extends ConnectedComponentVertexValue,
+  E extends Writable>
+  ConsumerWithVertex<I, V, E, LongWritable> setComponentConsumer() {
+    return (vertex, value) -> vertex.getValue().setComponent(value.get());
+  }
+
+  /**
+   *  If your vertex value class implements ConnectedComponentVertexValue, you
+   * can use this supplier for the UndirectedConnectedComponent Blocks.
+   */
+  static
+  <I extends WritableComparable, V extends ConnectedComponentVertexValue,
+  E extends Writable>
+  SupplierFromVertex<I, V, E, LongWritable> getComponentSupplier() {
+    return ReusableSuppliers.fromLong(
+        (vertex) -> vertex.getValue().getComponent());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValue.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValue.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValue.java
new file mode 100644
index 0000000..94f6836
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValue.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.prepare_graph.vertex;
+
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+
+import org.apache.giraph.function.vertex.ConsumerWithVertex;
+import org.apache.giraph.function.vertex.SupplierFromVertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/**
+ * Vertex value interface for weakly connected component extraction
+ */
+public interface WeaklyConnectedComponentVertexValue
+    extends ConnectedComponentVertexValue {
+  void setEdgeIds(LongOpenHashSet edgeIds);
+  LongOpenHashSet takeEdgeIds();
+
+  /**
+   * If your vertex value class implements WeaklyConnectedComponentVertexValue,
+   * you can use these 4 suppliers and consumers to interact with this Piece.
+   */
+  static
+  <I extends WritableComparable, V extends WeaklyConnectedComponentVertexValue,
+  E extends Writable>
+  ConsumerWithVertex<I, V, E, LongOpenHashSet> setEdgeIdsConsumer() {
+    return (vertex, edgeIds) -> vertex.getValue().setEdgeIds(edgeIds);
+  }
+
+  /**
+   * If your vertex value class implements WeaklyConnectedComponentVertexValue,
+   * you can use these 4 suppliers and consumers to interact with this Piece.
+   */
+  static
+  <I extends WritableComparable, V extends WeaklyConnectedComponentVertexValue,
+  E extends Writable>
+  SupplierFromVertex<I, V, E, LongOpenHashSet> getEdgeIdsSupplier() {
+    return (vertex) -> vertex.getValue().takeEdgeIds();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValueImpl.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValueImpl.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValueImpl.java
new file mode 100644
index 0000000..8b1da12
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/WeaklyConnectedComponentVertexValueImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.library.prepare_graph.vertex;
+
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Implementation of WeaklyConnectedComponentVertexValue
+ */
+public class WeaklyConnectedComponentVertexValueImpl
+    implements WeaklyConnectedComponentVertexValue {
+  private long component;
+  private LongOpenHashSet edgeIds = null;
+
+  @Override
+  public long getComponent() {
+    return component;
+  }
+
+  @Override
+  public void setComponent(long component) {
+    this.component = component;
+  }
+
+  @Override
+  public void setEdgeIds(LongOpenHashSet edgeIds) {
+    this.edgeIds = edgeIds;
+  }
+
+  @Override
+  public LongOpenHashSet takeEdgeIds() {
+    LongOpenHashSet res = edgeIds;
+    edgeIds = null;
+    return res;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(component);
+    if (edgeIds == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      out.writeInt(edgeIds.size());
+      for (long longId: edgeIds) {
+        out.writeLong(longId);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    component = in.readLong();
+    boolean isEdgeIds = in.readBoolean();
+    if (isEdgeIds) {
+      int friendsSize = in.readInt();
+      edgeIds = new LongOpenHashSet(friendsSize);
+      for (int i = 0; i < friendsSize; i++) {
+        edgeIds.add(in.readLong());
+      }
+    } else {
+      edgeIds = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/d7e4bde1/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/package-info.java
 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/package-info.java
new file mode 100644
index 0000000..6987fa4
--- /dev/null
+++ 
b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/vertex/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities for preparing and preprocessing the graph.
+ */
+package org.apache.giraph.block_app.library.prepare_graph.vertex;

Reply via email to