[GIRAPH-1013] Add core of Blocks Framework Summary: Add all classes in the core of the Framework.
This is the full execution engine of the framework. New module giraph-block-app is created for it, and all framework classes are going into org.apache.giraph.block_app.framework, and all non-framework-internal classes are going to go into subpackages of org.apache.giraph.block_app (i.e. utilities for writing applications, etc) Test Plan: mvn clean install Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov Reviewed By: sergey.edunov Differential Revision: https://reviews.facebook.net/D39639 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/819d6d38 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/819d6d38 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/819d6d38 Branch: refs/heads/trunk Commit: 819d6d38d6cb7073786f4b7ed11763fff5200ded Parents: 06a1084 Author: Igor Kabiljo <[email protected]> Authored: Fri Jun 5 00:19:41 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Wed Jun 10 19:50:06 2015 -0700 ---------------------------------------------------------------------- checkstyle-relaxed.xml | 272 +++++++++++ giraph-block-app/pom.xml | 124 +++++ giraph-block-app/src/main/assembly/compile.xml | 39 ++ .../framework/AbstractBlockFactory.java | 203 ++++++++ .../block_app/framework/BlockFactory.java | 83 ++++ .../giraph/block_app/framework/BlockUtils.java | 206 +++++++++ .../block_app/framework/BulkConfigurator.java | 32 ++ .../block_app/framework/api/BlockApi.java | 47 ++ .../block_app/framework/api/BlockConfApi.java | 32 ++ .../block_app/framework/api/BlockMasterApi.java | 58 +++ .../block_app/framework/api/BlockOutputApi.java | 31 ++ .../api/BlockOutputHandleAccessor.java | 33 ++ .../block_app/framework/api/BlockWorkerApi.java | 35 ++ .../framework/api/BlockWorkerContextApi.java | 45 ++ .../api/BlockWorkerContextReceiveApi.java | 29 ++ .../api/BlockWorkerContextSendApi.java | 39 ++ .../framework/api/BlockWorkerReceiveApi.java | 33 ++ .../framework/api/BlockWorkerSendApi.java | 116 +++++ .../framework/api/BlockWorkerValueAccessor.java | 36 ++ .../giraph/block_app/framework/api/Counter.java | 33 ++ .../framework/api/CreateReducersApi.java | 83 ++++ .../block_app/framework/api/StatusReporter.java | 31 ++ .../framework/api/giraph/BlockComputation.java | 54 +++ .../api/giraph/BlockMasterApiWrapper.java | 170 +++++++ .../api/giraph/BlockMasterCompute.java | 72 +++ .../api/giraph/BlockWorkerApiWrapper.java | 180 +++++++ .../api/giraph/BlockWorkerContext.java | 102 ++++ .../giraph/BlockWorkerContextApiWrapper.java | 84 ++++ .../framework/api/giraph/package-info.java | 22 + .../block_app/framework/api/package-info.java | 36 ++ .../giraph/block_app/framework/block/Block.java | 59 +++ .../block_app/framework/block/EmptyBlock.java | 39 ++ .../framework/block/FilteringBlock.java | 113 +++++ .../block_app/framework/block/IfBlock.java | 70 +++ .../block_app/framework/block/RepeatBlock.java | 87 ++++ .../framework/block/RepeatUntilBlock.java | 83 ++++ .../framework/block/SequenceBlock.java | 60 +++ .../block_app/framework/block/package-info.java | 22 + .../framework/internal/BlockCounters.java | 79 ++++ .../framework/internal/BlockMasterLogic.java | 173 +++++++ .../internal/BlockWorkerContextLogic.java | 91 ++++ .../framework/internal/BlockWorkerLogic.java | 68 +++ .../framework/internal/BlockWorkerPieces.java | 180 +++++++ .../framework/internal/PairedPieceAndStage.java | 111 +++++ .../framework/internal/package-info.java | 22 + .../framework/output/BlockOutputDesc.java | 50 ++ .../framework/output/BlockOutputFormat.java | 107 +++++ .../framework/output/BlockOutputHandle.java | 119 +++++ .../framework/output/BlockOutputOption.java | 52 +++ .../framework/output/BlockOutputWriter.java | 26 ++ .../framework/output/package-info.java | 21 + .../block_app/framework/package-info.java | 25 + .../framework/piece/AbstractPiece.java | 287 ++++++++++++ .../framework/piece/DefaultParentPiece.java | 311 +++++++++++++ .../giraph/block_app/framework/piece/Piece.java | 59 +++ .../framework/piece/PieceWithWorkerContext.java | 54 +++ .../framework/piece/delegate/DelegatePiece.java | 277 +++++++++++ .../piece/delegate/FilteringPiece.java | 157 +++++++ .../framework/piece/delegate/package-info.java | 21 + .../piece/global_comm/BroadcastHandle.java | 30 ++ .../piece/global_comm/ReduceUtilsObject.java | 62 +++ .../ReducerAndBroadcastWrapperHandle.java | 61 +++ .../piece/global_comm/ReducerHandle.java | 41 ++ .../piece/global_comm/array/ArrayHandle.java | 36 ++ .../global_comm/array/BroadcastArrayHandle.java | 35 ++ .../global_comm/array/ReducerArrayHandle.java | 43 ++ .../piece/global_comm/array/package-info.java | 21 + .../internal/CreateReducersApiWrapper.java | 73 +++ .../internal/ReducersForPieceHandler.java | 250 ++++++++++ .../internal/VertexSenderObserver.java | 28 ++ .../global_comm/internal/package-info.java | 22 + .../global_comm/map/BroadcastMapHandle.java | 36 ++ .../piece/global_comm/map/MapHandle.java | 31 ++ .../piece/global_comm/map/ReducerMapHandle.java | 43 ++ .../piece/global_comm/map/package-info.java | 21 + .../piece/global_comm/package-info.java | 22 + .../piece/interfaces/VertexPostprocessor.java | 34 ++ .../piece/interfaces/VertexReceiver.java | 54 +++ .../piece/interfaces/VertexSender.java | 45 ++ .../piece/interfaces/package-info.java | 21 + .../piece/messages/ObjectMessageClasses.java | 119 +++++ .../piece/messages/SupplierFromConf.java | 71 +++ .../framework/piece/messages/package-info.java | 21 + .../block_app/framework/piece/package-info.java | 24 + .../org/apache/giraph/function/Consumer.java | 34 ++ .../org/apache/giraph/function/Function.java | 38 ++ .../apache/giraph/function/PairConsumer.java | 34 ++ .../apache/giraph/function/PairFunction.java | 40 ++ .../org/apache/giraph/function/Supplier.java | 37 ++ .../apache/giraph/function/package-info.java | 26 ++ .../giraph/function/primitive/IntSupplier.java | 27 ++ .../giraph/function/primitive/package-info.java | 22 + .../function/vertex/ConsumerWithVertex.java | 48 ++ .../function/vertex/FunctionWithVertex.java | 52 +++ .../function/vertex/SupplierFromVertex.java | 49 ++ .../giraph/function/vertex/package-info.java | 22 + .../framework/block/BlockTestingUtils.java | 158 +++++++ .../block_app/framework/block/TestIfBlock.java | 88 ++++ .../framework/block/TestRepeatBlock.java | 80 ++++ .../framework/block/TestRepeatUntilBlock.java | 103 +++++ .../giraph/edge/LongDiffNullArrayEdges.java | 463 +++++++++++++++++++ .../giraph/utils/ConfigurationObjectUtils.java | 118 +++++ .../giraph/utils/DefaultOutputCommitter.java | 72 +++ .../org/apache/giraph/utils/WritableUtils.java | 187 ++++++++ .../giraph/edge/LongDiffNullArrayEdgesTest.java | 260 +++++++++++ pom.xml | 12 +- 106 files changed, 8395 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/checkstyle-relaxed.xml ---------------------------------------------------------------------- diff --git a/checkstyle-relaxed.xml b/checkstyle-relaxed.xml new file mode 100644 index 0000000..11bcfe0 --- /dev/null +++ b/checkstyle-relaxed.xml @@ -0,0 +1,272 @@ +<?xml version="1.0"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + This version of checkstyle is based on the Hadoop and common-math + checkstyle configurations. It is a best effort attempt to try to match + the CODE_CONVENTIONS and Oracle "Code Conventions for the Java + Programming Language". See the following link: + + http://www.oracle.com/technetwork/java/codeconvtoc-136057.html + + The documentation for checkstyle is available at + + http://checkstyle.sourceforge.net +--> + +<!DOCTYPE module PUBLIC "-//Puppy Crawl//DTD Check Configuration 1.1//EN" "http://www.puppycrawl.com/dtds/configuration_1_1.dtd"> + +<!-- Apache giraph customization of default Checkstyle behavior --> +<module name="Checker"> + <property name="localeLanguage" value="en"/> + + <!-- Checks for headers --> + <!-- See http://checkstyle.sf.net/config_header.html --> + <!-- Verify that EVERY source file has the appropriate license --> + <module name="Header"> + <property name="headerFile" value="${checkstyle.header.file}"/> + <property name="fileExtensions" value="java"/> + </module> + + <!-- Checks for Javadoc comments (checker). --> + <!-- See http://checkstyle.sf.net/config_javadoc.html --> + <!-- Require package javadoc --> + <module name="JavadocPackage"/> + + <!-- Miscellaneous other checks (checker). --> + <!-- See http://checkstyle.sf.net/config_misc.html --> + <!-- Require files to end with newline characters --> + <module name="NewlineAtEndOfFile"/> + + <!-- Checks for whitespace (tree walker) --> + <!-- See http://checkstyle.sf.net/config_whitespace.html --> + <!-- No tabs allowed! --> + <module name="FileTabCharacter"/> + + <module name="TreeWalker"> + <property name="cacheFile" value="target/checkstyle-cachefile"/> + + <!-- Checks for blocks. You know, those {}'s --> + <!-- See http://checkstyle.sf.net/config_blocks.html --> + <!-- No empty blocks (i.e. catch) --> + <module name="EmptyBlock"/> + <module name="AvoidNestedBlocks"/> + <!-- No if/else/do/for/while without braces --> + <module name="NeedBraces"/> + <module name="LeftCurly"/> + <module name="RightCurly"/> + + <!-- Checks for class design --> + <!-- See http://checkstyle.sf.net/config_design.html --> + <!-- Utility class should not be instantiated, they must have a + private constructor --> + <module name="HideUtilityClassConstructor"/> + <!-- Interfaces must be types (not just constants) --> + <module name="InterfaceIsType"/> + <!-- No public fields --> + <module name="VisibilityModifier"> + <property name="protectedAllowed" value="true"/> + <property name="publicMemberPattern" value="^$"/> + </module> + + <!-- Checks for common coding problems --> + <!-- See http://checkstyle.sf.net/config_coding.html --> + <module name="EmptyStatement"/> + <!-- Require hash code override when equals is --> + <module name="EqualsHashCode"/> + <!-- Method parameters and local variables should not hide + fields, except in constructors and setters --> + <module name="HiddenField"> + <property name="ignoreConstructorParameter" value="true" /> + <property name="ignoreSetter" value="true" /> + <property name="tokens" value="VARIABLE_DEF"/> + </module> + <!-- Disallow unnecessary instantiation of Boolean, String --> + <module name="IllegalInstantiation"> + <property name="classes" value="java.lang.Boolean, java.lang.String"/> + </module> + <module name="InnerAssignment"/> + <!-- Switch statements should be complete and with independent cases --> + <module name="FallThrough" /> + <module name="MissingSwitchDefault" /> + <module name="SimplifyBooleanExpression"/> + <module name="SimplifyBooleanReturn"/> + <!-- Only one statment per line allowed --> + <module name="OneStatementPerLine"/> + <!-- Use a consistent way to put declarations --> + <module name="DeclarationOrder" /> + <!-- Don't add up parentheses when they are not required --> + <module name="UnnecessaryParentheses" /> + <!-- Don't use too widespread catch (Exception, Throwable, + RuntimeException) --> + <module name="IllegalCatch" /> + <!-- Don't use = or != for string comparisons --> + <module name="StringLiteralEquality" /> + <!-- Don't declare multiple variables in the same statement --> + <module name="MultipleVariableDeclarations" /> + <!-- String literals more than one character long should not be + repeated several times --> + <!-- the "unchecked" string is also accepted to allow + @SuppressWarnings("unchecked") --> + <!-- Disabling for now until we have a better ignoreStringsRegexp --> + <!-- + <module name="MultipleStringLiterals" > + <property name="ignoreStringsRegexp" value='^(("")|(".")|("unchecked"))$'/> + </module> + --> + + <!-- Checks for imports --> + <!-- See http://checkstyle.sf.net/config_import.html --> + <module name="RedundantImport"/> + <!-- Import should be explicit, really needed and only from pure + java packages --> + <module name="AvoidStarImport" /> + <module name="UnusedImports" /> + <module name="IllegalImport" /> + + <!-- Checks for Javadoc comments (tree walker). --> + <!-- See http://checkstyle.sf.net/config_javadoc.html --> + <!-- Javadoc must be formatted correctly --> + <module name="JavadocStyle"> + <property name="checkFirstSentence" value="false"/> + </module> + <!-- Must have class / interface header comments --> + <module name="JavadocType"/> + <!-- Require method javadocs, allow undeclared RTE, allow missing + javadoc on getters and setters --> + <module name="JavadocMethod"> + <property name="allowMissingJavadoc" value="true"/> + <property name="allowUndeclaredRTE" value="true"/> + <property name="allowMissingThrowsTags" value="true"/> + <property name="allowMissingPropertyJavadoc" value="true"/> + <property name="allowMissingParamTags" value="true"/> + <property name="allowMissingReturnTag" value="true"/> + </module> + + <!-- Miscellaneous other checks (tree walker). --> + <!-- See http://checkstyle.sf.net/config_misc.html --> + <!-- Java style arrays --> + <module name="ArrayTypeStyle"/> + <!-- Indentation --> + <module name="Indentation"> + <property name="basicOffset" value="2"/> + <property name="braceAdjustment" value="0"/> + <property name="caseIndent" value="0"/> + <property name="throwsIndent" value="2"/> + <property name="lineWrappingIndentation" value="0"/> + <property name="arrayInitIndent" value="2"/> + </module> + <!-- Turn this on to see what needs to be done + <module name="TodoComment"/> + --> + <module name="UpperEll"/> + + <!-- Modifier Checks --> + <!-- See http://checkstyle.sf.net/config_modifiers.html --> + <!-- Use a consistent way to put modifiers --> + <module name="ModifierOrder"/> + <module name="RedundantModifier"/> + + <!-- Checks for Naming Conventions. --> + <!-- See http://checkstyle.sf.net/config_naming.html --> + <!-- Constant names should obey the traditional all uppercase + naming convention --> + <module name="ConstantName"/> + <module name="LocalFinalVariableName"/> + <module name="LocalVariableName"/> + <module name="MemberName"/> + <module name="MethodName"/> + <module name="PackageName"/> + <module name="ParameterName"/> + <module name="StaticVariableName"> + <property name="format" value="^[A-Z0-9_]*$"/> + </module> + <module name="TypeName"/> + + <!-- Checks for regexp expressions. --> + <!-- See http://checkstyle.sf.net/config_regexp.html --> + <!-- No trailing whitespace --> + <module name="Regexp"> + <property name="format" value="[ \t]+$"/> + <property name="illegalPattern" value="true"/> + <property name="message" value="Trailing whitespace"/> + </module> + <!-- No System.out.println() statements --> + <module name="Regexp"> + <!-- No sysouts --> + <property name="format" value="System\.out\.println"/> + <property name="illegalPattern" value="true"/> + </module> + <!-- Authors should be in pom.xml file --> + <module name="Regexp"> + <property name="format" value="@author"/> + <property name="illegalPattern" value="true"/> + <property name="message" value="developers names should be in pom file"/> + </module> + + <!-- Checks for Size Violations. --> + <!-- See http://checkstyle.sf.net/config_sizes.html --> + <!-- Lines cannot exceed 80 chars --> + <module name="LineLength"> + <property name="max" value="80"/> + <property name="ignorePattern" value="^import"/> + </module> + <!-- Over time, we will revised this down --> + <module name="MethodLength"> + <property name="max" value="200"/> + </module> + <module name="ParameterNumber"> + <property name="max" value="8"/> + </module> + + <!-- Checks for whitespace (tree walker) --> + <!-- See http://checkstyle.sf.net/config_whitespace.html --> + <module name="EmptyForIteratorPad"/> + <!-- Spacing around methods --> + <module name="MethodParamPad"> + <property name="option" value="nospace"/> + <property name="allowLineBreaks" value="true"/> + </module> + <!-- No whitespace before a token --> + <module name="NoWhitespaceBefore"/> + <!-- Whitespace after tokens is required --> + <module name="WhitespaceAfter"/> + <!-- Whitespace around tokens is required --> + <module name="WhitespaceAround"/> + <module name="ParenPad"/> + <module name="TypecastParenPad"/> + <!-- No extra whitespace around types --> + <module name="GenericWhitespace"/> + <!-- Operator must be at end of wrapped line --> + <module name="OperatorWrap"> + <property name="option" value="eol"/> + </module> + + <!-- Required for SuppressionCommentFilter below --> + <module name="FileContentsHolder"/> + </module> + + <!-- Setup special comments to suppress specific checks from source files --> + <module name="SuppressionCommentFilter"> + <property name="offCommentFormat" value="CHECKSTYLE\: stop ([\w\|]+)"/> + <property name="onCommentFormat" value="CHECKSTYLE\: resume ([\w\|]+)"/> + <property name="checkFormat" value="$1"/> + </module> +</module> + http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-block-app/pom.xml b/giraph-block-app/pom.xml new file mode 100644 index 0000000..1f653bb --- /dev/null +++ b/giraph-block-app/pom.xml @@ -0,0 +1,124 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.giraph</groupId> + <artifactId>giraph-parent</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>giraph-block-app</artifactId> + <packaging>jar</packaging> + + <name>Apache Giraph Blocks Framework</name> + <url>http://giraph.apache.org/giraph-block-app/</url> + <description>Giraph Blocks Framework and utilities for writing applications</description> + + <properties> + <top.dir>${project.basedir}/..</top.dir> + <checkstyle.config.path>${top.dir}/checkstyle-relaxed.xml</checkstyle.config.path> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <configuration> + <siteDirectory>${project.basedir}/src/site</siteDirectory> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.6</version> + <configuration> + <skip>${surefire.skip}</skip> + <systemProperties> + <property> + <name>prop.jarLocation</name> + <value>${top.dir}/giraph-core/target/giraph-${project.version}-${forHadoop}-jar-with-dependencies.jar</value> + </property> + </systemProperties> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>findbugs-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <!-- compile dependencies. sorted lexicographically. --> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + </dependency> + <dependency> + <groupId>org.apache.giraph</groupId> + <artifactId>giraph-core</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>org.python</groupId> + <artifactId>jython</artifactId> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </dependency> + + <!-- runtime dependency --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>runtime</scope> + </dependency> + + <!-- test dependencies. sorted lexicographically. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/assembly/compile.xml ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/assembly/compile.xml b/giraph-block-app/src/main/assembly/compile.xml new file mode 100644 index 0000000..fcaffa6 --- /dev/null +++ b/giraph-block-app/src/main/assembly/compile.xml @@ -0,0 +1,39 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd"> + <id>jar-with-dependencies</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + + <dependencySets> + <dependencySet> + <useProjectArtifact>true</useProjectArtifact> + <outputDirectory>/</outputDirectory> + <unpackOptions> + <excludes> + <exclude>META-INF/LICENSE</exclude> + </excludes> + </unpackOptions> + <unpack>true</unpack> + <scope>runtime</scope> + </dependencySet> + </dependencySets> +</assembly> http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/AbstractBlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/AbstractBlockFactory.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/AbstractBlockFactory.java new file mode 100644 index 0000000..66ad775 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/AbstractBlockFactory.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + +import java.util.List; + +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.edge.IdAndNullArrayEdges; +import org.apache.giraph.edge.IdAndValueArrayEdges; +import org.apache.giraph.edge.LongDiffNullArrayEdges; +import org.apache.giraph.types.ops.PrimitiveIdTypeOps; +import org.apache.giraph.types.ops.TypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Default block factory abstract class, providing default methods that need + * to be/can be overridden for specifying required/most common parameters, + * to simplify setting properties. + * + * @param <S> Execution stage type + */ +public abstract class AbstractBlockFactory<S> implements BlockFactory<S> { + /** + * Comma separated list of BulkConfigurators, that are going to be called + * to simplify specifying of large number of properties. + */ + public static final StrConfOption CONFIGURATORS = new StrConfOption( + "digraph.block_factory_configurators", null, ""); + + @Override + public List<String> getGcJavaOpts(Configuration conf) { + return null; + } + + @Override + public final void initConfig(GiraphConfiguration conf) { + initConfigurators(conf); + GiraphConstants.VERTEX_ID_CLASS.setIfUnset(conf, getVertexIDClass(conf)); + GiraphConstants.VERTEX_VALUE_CLASS.setIfUnset( + conf, getVertexValueClass(conf)); + GiraphConstants.EDGE_VALUE_CLASS.setIfUnset(conf, getEdgeValueClass(conf)); + GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.setIfUnset( + conf, shouldCreateVertexOnMsgs(conf)); + if (shouldSendOneMessageToAll(conf)) { + GiraphConstants.MESSAGE_ENCODE_AND_STORE_TYPE.setIfUnset( + conf, MessageEncodeAndStoreType.EXTRACT_BYTEARRAY_PER_PARTITION); + } + + BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.setIfUnset( + conf, getWorkerContextValueClass(conf)); + + // optimize edge structure, if available and not set already + if (!GiraphConstants.VERTEX_EDGES_CLASS.contains(conf)) { + @SuppressWarnings("rawtypes") + Class<? extends WritableComparable> vertexIDClass = + GiraphConstants.VERTEX_ID_CLASS.get(conf); + Class<? extends Writable> edgeValueClass = + GiraphConstants.EDGE_VALUE_CLASS.get(conf); + + + @SuppressWarnings("rawtypes") + PrimitiveIdTypeOps<? extends WritableComparable> idTypeOps = + TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(vertexIDClass); + if (edgeValueClass.equals(NullWritable.class)) { + if (vertexIDClass.equals(LongWritable.class)) { + GiraphConstants.VERTEX_EDGES_CLASS.set( + conf, LongDiffNullArrayEdges.class); + } else if (idTypeOps != null) { + GiraphConstants.VERTEX_EDGES_CLASS.set( + conf, IdAndNullArrayEdges.class); + } + } else { + TypeOps<?> edgeValueTypeOps = + TypeOpsUtils.getTypeOpsOrNull(edgeValueClass); + if (edgeValueTypeOps != null) { + GiraphConstants.VERTEX_EDGES_CLASS.set( + conf, IdAndValueArrayEdges.class); + } + } + } + + additionalInitConfig(conf); + } + + @Override + public void registerOutputs(GiraphConfiguration conf) { + } + + private void initConfigurators(GiraphConfiguration conf) { + String configurators = CONFIGURATORS.get(conf); + if (configurators != null) { + String[] split = configurators.split(","); + for (String configurator : split) { + runConfigurator(conf, configurator); + } + } + } + + private void runConfigurator(GiraphConfiguration conf, String configurator) { + String[] packages = getConvenienceConfiguratorPackages(); + String[] prefixes = new String[packages.length + 1]; + prefixes[0] = ""; + for (int i = 0; i < packages.length; i++) { + prefixes[i + 1] = packages[i] + "."; + } + + for (String prefix : prefixes) { + try { + @SuppressWarnings({ "unchecked", "rawtypes" }) + Class<BulkConfigurator> confClass = + (Class) Class.forName(prefix + configurator); + BulkConfigurator c = ReflectionUtils.newInstance(confClass); + c.configure(conf); + return; + // CHECKSTYLE: stop EmptyBlock + // ignore ClassNotFoundException, and continue the loop + } catch (ClassNotFoundException e) { + } + // CHECKSTYLE: resume EmptyBlock + } + throw new IllegalStateException( + "Configurator " + configurator + " not found"); + } + + /** + * Additional configuration initialization, other then overriding + * class specification. + */ + protected void additionalInitConfig(GiraphConfiguration conf) { + } + + /** + * Concrete vertex id class application will use. + */ + @SuppressWarnings("rawtypes") + protected abstract Class<? extends WritableComparable> getVertexIDClass( + GiraphConfiguration conf); + + /** + * Concrete vertex value class application will use. + */ + protected abstract Class<? extends Writable> getVertexValueClass( + GiraphConfiguration conf); + + /** + * Concrete edge value class application will use. + */ + protected abstract Class<? extends Writable> getEdgeValueClass( + GiraphConfiguration conf); + + /** + * Concrete worker context value class application will use, if overridden. + */ + protected Class<?> getWorkerContextValueClass(GiraphConfiguration conf) { + return Object.class; + } + + /** + * Override if vertices shouldn't be created by default, if message is sent + * to a vertex that doesn't exist. + */ + protected boolean shouldCreateVertexOnMsgs(GiraphConfiguration conf) { + return true; + } + + // TODO - see if it should be deprecated + protected boolean shouldSendOneMessageToAll(GiraphConfiguration conf) { + return false; + } + + /** + * Provide list of strings representing packages where configurators will + * be searched for, allowing that full path is not required for + * CONFIGURATORS option. + */ + protected String[] getConvenienceConfiguratorPackages() { + return new String[] { }; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockFactory.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockFactory.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockFactory.java new file mode 100644 index 0000000..652eb5e --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockFactory.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + +import java.util.List; + +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.hadoop.conf.Configuration; + +/** + * Class describing a particular application. + * Everything except input and output should be fully encapsulated within + * this class. For any application, it should be enough to only specify + * particular BlockFactory. + * + * Given configuration, it creates a block that represents a full Giraph job. + * + * Recommended is to extend AbstractBlockFactory directly for most cases. + * + * @param <S> Execution stage type + */ +public interface BlockFactory<S> { + /** + * Based on provided configuration, updates it, such that all necessary + * properties are initialized. + */ + void initConfig(GiraphConfiguration conf); + + /** + * Create a block (representing a full Giraph job), based on the given + * configuration. Configuration should be treated as immutable at this point. + * + * If there are issues in configuration, it is very cheap to throw + * from this method - as Giraph job will not even start. + * This function will be called two times - once before starting + * of the Giraph job, to fail early if anything is incorrectly configured. + * Second time will be on Master, which will return Block instance + * on which createIterator will be called once, which should return + * current application run. + * initConfig will be called only once, before starting Giraph job itself. + * Master will contain configuration already modified by initConfig. + */ + Block createBlock(GiraphConfiguration conf); + + /** + * Create an empty instance of execution stage object. + * + * Can be used by application to be aware of what was executed before. + * Most common example is counting iterations, or for having a boolean whether + * some important event happened. + * + * Execution stage should be immutable object, with creating a new + * object when different value is needed. + */ + S createExecutionStage(GiraphConfiguration conf); + + /** + * Get special GC Java options. If returns null, default options are used. + */ + List<String> getGcJavaOpts(Configuration conf); + + /** + * Register outputs to use during the application (vs output at the end of + * the application), based on provided configuration. + */ + void registerOutputs(GiraphConfiguration conf); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java new file mode 100644 index 0000000..df260f5 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + +import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import org.apache.giraph.block_app.framework.api.giraph.BlockComputation; +import org.apache.giraph.block_app.framework.api.giraph.BlockMasterCompute; +import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.conf.ClassConfOption; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.MessageClasses; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.types.NoMessage; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; + +import com.google.common.base.Preconditions; + +/** + * Utility functions for block applications + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class BlockUtils { + /** Property describing BlockFactory to use for current application run */ + public static final ClassConfOption<BlockFactory> BLOCK_FACTORY_CLASS = + ClassConfOption.create("digraph.block_factory", null, BlockFactory.class, + "block factory describing giraph job"); + + /** Property describing BlockFactory to use for current application run */ + public static final ClassConfOption<Object> BLOCK_WORKER_CONTEXT_VALUE_CLASS = + ClassConfOption.create( + "digraph.block_worker_context_value_class", null, Object.class, + "block worker context value class"); + + private static final Logger LOG = Logger.getLogger(BlockUtils.class); + + /** Dissallow constructor */ + private BlockUtils() { } + + /** + * Create new BlockFactory that is specified in the configuration. + */ + public static <S> BlockFactory<S> createBlockFactory(Configuration conf) { + return ReflectionUtils.newInstance(BLOCK_FACTORY_CLASS.get(conf)); + } + + /** + * Set which BlockFactory class to be used for the application. + * (generally useful within tests only) + */ + public static void setBlockFactoryClass(Configuration conf, + Class<? extends BlockFactory<?>> clazz) { + BLOCK_FACTORY_CLASS.set(conf, clazz); + } + + /** + * Set block factory, and initialize configs with it. + * Should be used only if there are no configuration options set after + * this method call. + */ + public static void setAndInitBlockFactoryClass(GiraphConfiguration conf, + Class<? extends BlockFactory<?>> clazz) { + BLOCK_FACTORY_CLASS.set(conf, clazz); + initAndCheckConfig(conf); + } + + /** + * Initializes configuration, such that running it executes block application. + * + * Additionally, checks types of all pieces with a block application. + */ + public static void initAndCheckConfig(GiraphConfiguration conf) { + conf.setMasterComputeClass(BlockMasterCompute.class); + conf.setComputationClass(BlockComputation.class); + conf.setWorkerContextClass(BlockWorkerContext.class); + + Preconditions.checkState( + GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null, + "Message types should only be specified in Pieces, " + + "but outgoing was specified globally"); + Preconditions.checkState( + GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS + .isDefaultValue(conf), + "Message types should only be specified in Pieces, " + + "but factory was specified globally"); + Preconditions.checkState( + GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null, + "Message combiner should only be specified in Pieces, " + + "but was specified globally"); + + BlockFactory<?> blockFactory = createBlockFactory(conf); + blockFactory.initConfig(conf); + + Preconditions.checkState( + GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.get(conf) == null, + "Outgoing message type was specified in blockFactory.initConfig"); + Preconditions.checkState( + GiraphConstants.OUTGOING_MESSAGE_VALUE_FACTORY_CLASS + .isDefaultValue(conf), + "Outgoing message factory type was specified in " + + "blockFactory.initConfig"); + Preconditions.checkState( + GiraphConstants.MESSAGE_COMBINER_CLASS.get(conf) == null, + "Message combiner type was specified in blockFactory.initConfig"); + + GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS.set(conf, NoMessage.class); + + final ImmutableClassesGiraphConfiguration immConf = + new ImmutableClassesGiraphConfiguration<>(conf); + + // Create blocks to detect issues before creating a Giraph job + // They will not be used here + Block executionBlock = blockFactory.createBlock(immConf); + LOG.info("Executing application - " + executionBlock); + + final Class<?> vertexIdClass = GiraphConstants.VERTEX_ID_CLASS.get(conf); + final Class<?> vertexValueClass = + GiraphConstants.VERTEX_VALUE_CLASS.get(conf); + final Class<?> edgeValueClass = + GiraphConstants.EDGE_VALUE_CLASS.get(conf); + final Class<?> workerContextValueClass = + BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.get(conf); + final Class<?> executionStageClass = + blockFactory.createExecutionStage(conf).getClass(); + + // Check for type inconsistencies + executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() { + @Override + public void apply(AbstractPiece piece) { + if (!piece.getClass().equals(Piece.class)) { + Class<?>[] classList = getTypeArguments( + AbstractPiece.class, piece.getClass()); + Preconditions.checkArgument(classList.length == 7); + + ReflectionUtils.verifyTypes( + vertexIdClass, classList[0], "vertexId", piece.getClass()); + ReflectionUtils.verifyTypes( + vertexValueClass, classList[1], "vertexValue", piece.getClass()); + ReflectionUtils.verifyTypes( + edgeValueClass, classList[2], "edgeValue", piece.getClass()); + + MessageClasses classes = piece.getMessageClasses(immConf); + Class<?> messageType = classes.getMessageClass(); + if (messageType == null) { + messageType = NoMessage.class; + } + ReflectionUtils.verifyTypes( + messageType, classList[3], "message", piece.getClass()); + + ReflectionUtils.verifyTypes( + workerContextValueClass, classList[4], + "workerContextValue", piece.getClass()); + // No need to check worker context message class at all + + ReflectionUtils.verifyTypes( + executionStageClass, classList[6], + "executionStage", piece.getClass()); + } + } + }); + + // check for non 'static final' fields in BlockFactories + Class<?> bfClass = blockFactory.getClass(); + while (!bfClass.equals(Object.class)) { + for (Field field : bfClass.getDeclaredFields()) { + if (!Modifier.isStatic(field.getModifiers()) || + !Modifier.isFinal(field.getModifiers())) { + throw new IllegalStateException("BlockFactory (" + bfClass + + ") cannot have any mutable (non 'static final') fields as a " + + "safety measure, as createBlock function is called from a " + + "different context then all other functions, use conf argument " + + "instead, or make it 'static final'. Field present: " + field); + } + } + bfClass = bfClass.getSuperclass(); + } + + // Register outputs + blockFactory.registerOutputs(conf); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BulkConfigurator.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BulkConfigurator.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BulkConfigurator.java new file mode 100644 index 0000000..348c907 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BulkConfigurator.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework; + +import org.apache.giraph.conf.GiraphConfiguration; + +/** + * Function that modifies configuration. + * + * Allows for multi-option configuration to be specified in a common classes. + */ +public interface BulkConfigurator { + /** + * Modify given configuration. + */ + void configure(GiraphConfiguration conf); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java new file mode 100644 index 0000000..a0c92ad --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockApi.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + + +/** + * Basic block computation API for accessing items + * present on both workers and master. + */ +public interface BlockApi extends BlockConfApi { + /** + * Get the total (all workers) number of vertices that + * existed at the start of the current piece. + * + * Recommended to avoid it, as it introduces global dependencies, + * code will not be able to work on top of a subgraphs any more. + * This number should be easily computable via reducer or aggregator. + */ + @Deprecated + long getTotalNumVertices(); + + /** + * Get the total (all workers) number of edges that + * existed at the start of the current piece. + * + * Recommended to avoid it, as it introduces global dependencies, + * code will not be able to work on top of a subgraphs any more. + * This number should be easily computable via reducer or aggregator. + */ + @Deprecated + long getTotalNumEdges(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockConfApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockConfApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockConfApi.java new file mode 100644 index 0000000..98e51f8 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockConfApi.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; + +/** + * Block computation API for accessing configuration. + */ +public interface BlockConfApi { + /** + * Return the configuration used by this object. + * + * @return Set configuration + */ + ImmutableClassesGiraphConfiguration<?, ?, ?> getConf(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockMasterApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockMasterApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockMasterApi.java new file mode 100644 index 0000000..3d04584 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockMasterApi.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.master.MasterAggregatorUsage; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.hadoop.io.Writable; + +/** + * Block computation API available for the master methods. + * + * Interface to the MasterCompute methods. + */ +public interface BlockMasterApi extends MasterAggregatorUsage, + MasterGlobalCommUsage, StatusReporter, BlockApi, BlockOutputApi { + /** + * No need to use it, and introduces global dependencies. + * + * Store data locally within the piece, or use ObjectHolder. + */ + @Deprecated + @Override + <A extends Writable> + boolean registerPersistentAggregator( + String name, Class<? extends Aggregator<A>> aggregatorClass + ) throws InstantiationException, IllegalAccessException; + + /** + * Broadcast given value to all workers for next computation. + * @param value Value to broadcast + */ + <T extends Writable> BroadcastHandle<T> broadcast(T value); + + /** + * Call this to log a line to command line of the job. Use in moderation - + * it's a synchronous call to Job client + * + * @param line Line to print + */ + void logToCommandLine(String line); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockOutputApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockOutputApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockOutputApi.java new file mode 100644 index 0000000..8c33623 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockOutputApi.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.block_app.framework.output.BlockOutputDesc; +import org.apache.giraph.block_app.framework.output.BlockOutputWriter; + +/** + * Block worker output api + */ +public interface BlockOutputApi { + <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>> + OD getOutputDesc(String confOption); + + <OW extends BlockOutputWriter> OW getWriter(String confOption); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockOutputHandleAccessor.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockOutputHandleAccessor.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockOutputHandleAccessor.java new file mode 100644 index 0000000..074637d --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockOutputHandleAccessor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.block_app.framework.output.BlockOutputHandle; + +/** + * Function for accessing BlockOutputHandle. + * + * Intentionally hidden from APIs, to allow usage only + * within DefaultParentPiece. + */ +public interface BlockOutputHandleAccessor { + /** + * Get global block output handle. + */ + BlockOutputHandle getBlockOutputHandle(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java new file mode 100644 index 0000000..727bf08 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerApi.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.aggregators.AggregatorUsage; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.hadoop.io.WritableComparable; + +/** + * Block computation API available for worker methods. + * + * Interface to the Computation methods. + * @param <I> vertex Id type. + */ +@SuppressWarnings("rawtypes") +public interface BlockWorkerApi<I extends WritableComparable> + extends AggregatorUsage, BlockApi { + @Override + ImmutableClassesGiraphConfiguration<I, ?, ?> getConf(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java new file mode 100644 index 0000000..d5918b5 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextApi.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.aggregators.AggregatorUsage; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; + +/** + * Block computation API available for worker context methods. + * + * Interface to the WorkerContext methods. + */ +public interface BlockWorkerContextApi extends AggregatorUsage, BlockApi { + @Override + ImmutableClassesGiraphConfiguration<?, ?, ?> getConf(); + + /** + * Get number of workers + * + * @return Number of workers + */ + int getWorkerCount(); + + /** + * Get index for this worker + * + * @return Index of this worker + */ + int getMyWorkerIndex(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java new file mode 100644 index 0000000..a8242b2 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextReceiveApi.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.worker.WorkerBroadcastUsage; + +/** + * Block computation API available for worker context receive methods. + * + * Interface to the WorkerContext methods. + */ +public interface BlockWorkerContextReceiveApi + extends BlockWorkerContextApi, WorkerBroadcastUsage { +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java new file mode 100644 index 0000000..769562d --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerContextSendApi.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.hadoop.io.Writable; + +/** + * Block computation API available for worker send methods. + * + * Interface to the WorkerContext methods. + * + * @param <WM> Worker message type + */ +public interface BlockWorkerContextSendApi<WM extends Writable> + extends BlockWorkerContextApi, WorkerAggregatorUsage { + /** + * Send message to another worker + * + * @param message Message to send + * @param workerIndex Index of the worker to send the message to + */ + void sendMessageToWorker(WM message, int workerIndex); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java new file mode 100644 index 0000000..6db51bd --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerReceiveApi.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.hadoop.io.WritableComparable; + +/** + * Block computation API available for worker receive methods. + * + * Interface to the Computation methods. + * + * @param <I> vertex Id type. + */ +@SuppressWarnings("rawtypes") +public interface BlockWorkerReceiveApi<I extends WritableComparable> + extends BlockWorkerApi<I>, WorkerBroadcastUsage, BlockOutputApi { +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerSendApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerSendApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerSendApi.java new file mode 100644 index 0000000..a72150f --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerSendApi.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import java.util.Iterator; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.worker.WorkerAggregatorUsage; +import org.apache.giraph.worker.WorkerReduceUsage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Block computation API available for worker send methods. + * + * Interface to the Computation methods. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message type + */ +@SuppressWarnings("rawtypes") +public interface BlockWorkerSendApi<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends BlockWorkerApi<I>, WorkerAggregatorUsage, WorkerReduceUsage { + @Override + ImmutableClassesGiraphConfiguration<I, V, E> getConf(); + + /** + * Send a message to a vertex id. + * + * @param id Vertex id to send the message to + * @param message Message data to send + */ + void sendMessage(I id, M message); + + /** + * Send a message to all edges. + * + * @param vertex Vertex whose edges to send the message to. + * @param message Message sent to all edges. + */ + void sendMessageToAllEdges(Vertex<I, V, E> vertex, M message); + + /** + * Send a message to multiple target vertex ids in the iterator. + * + * @param vertexIdIterator An iterator to multiple target vertex ids. + * @param message Message sent to all targets in the iterator. + */ + void sendMessageToMultipleEdges(Iterator<I> vertexIdIterator, M message); + + /** + * Sends a request to create a vertex that will be available + * in the receive phase. + * + * @param id Vertex id + * @param value Vertex value + * @param edges Initial edges + */ + void addVertexRequest(I id, V value, OutEdges<I, E> edges); + + /** + * Sends a request to create a vertex that will be available + * in the receive phase. + * + * @param id Vertex id + * @param value Vertex value + */ + void addVertexRequest(I id, V value); + + /** + * Request to remove a vertex from the graph + * (applied just prior to the next receive phase). + * + * @param vertexId Id of the vertex to be removed. + */ + void removeVertexRequest(I vertexId); + + /** + * Request to add an edge of a vertex in the graph + * (processed just prior to the next receive phase) + * + * @param sourceVertexId Source vertex id of edge + * @param edge Edge to add + */ + void addEdgeRequest(I sourceVertexId, Edge<I, E> edge); + + /** + * Request to remove all edges from a given source vertex to a given target + * vertex (processed just prior to the next receive phase). + * + * @param sourceVertexId Source vertex id + * @param targetVertexId Target vertex id + */ + void removeEdgesRequest(I sourceVertexId, I targetVertexId); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerValueAccessor.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerValueAccessor.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerValueAccessor.java new file mode 100644 index 0000000..f7cddf3 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/BlockWorkerValueAccessor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +/** + * Function for accessing WorkerValue. + * + * Intentionally hidden from APIs, to allow usage only + * within PieceWithWorkerContext. + */ +public interface BlockWorkerValueAccessor { + /** + * Get global worker value. + * Value returned can be accessed from may threads, and so all + * accesses to it should be done in a thread-safe manner! + * + * This is the only place in Blocks Framework where you need + * to take care of concurrency. + */ + Object getWorkerValue(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/Counter.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/Counter.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/Counter.java new file mode 100644 index 0000000..730b0ab --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/Counter.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +/** Interface wrapping around org.appache.hadoop.mapreduce.Counter */ +public interface Counter { + /** + * Set this counter by the given value + * @param value the value to set + */ + void setValue(long value); + + /** + * Increment this counter by the given value + * @param incr the value to increase this counter by + */ + void increment(long incr); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/CreateReducersApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/CreateReducersApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/CreateReducersApi.java new file mode 100644 index 0000000..77b3b1e --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/CreateReducersApi.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.hadoop.io.Writable; + +/** + * Api for creating reducer handles. + */ +public interface CreateReducersApi extends BlockConfApi { + + /** + * Create local reducer, returning a handle to it. + * + * Local reducer means that each worker thread has it's own local partially + * reduced value, which are at the end reduced all together. + * Preferable, unless it cannot be used, because all copies of the object + * do not fit the memory. + */ + <S, R extends Writable> ReducerHandle<S, R> createLocalReducer( + ReduceOperation<S, R> reduceOp); + + /** + * Create local reducer, returning a handle to it. + * + * Local reducer means that each worker thread has it's own local partially + * reduced value, which are at the end reduced all together. + * Preferable, unless it cannot be used, because all copies of the object + * do not fit the memory. + */ + <S, R extends Writable> ReducerHandle<S, R> createLocalReducer( + ReduceOperation<S, R> reduceOp, R globalInitialValue); + + /** + * Create global reducer, returning a handle to it. + * + * Global reducer means that there is only one value for each worker, + * and each call to reduce will have to obtain a global lock, and incur + * synchronization costs. + * Use only when objects are so large, that having many copies cannot + * fit into memory. + */ + <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer( + ReduceOperation<S, R> reduceOp); + + /** + * Create global reducer, returning a handle to it. + * + * Global reducer means that there is only one value for each worker, + * and each call to reduce will have to obtain a global lock, and incur + * synchronization costs. + * Use only when objects are so large, that having many copies cannot + * fit into memory. + */ + <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer( + ReduceOperation<S, R> reduceOp, R globalInitialValue); + + /** + * Function that creates a reducer - abstracting away whether it is + * local or global reducer + */ + public interface CreateReducerFunctionApi { + <S, R extends Writable> ReducerHandle<S, R> createReducer( + ReduceOperation<S, R> reduceOp); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/StatusReporter.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/StatusReporter.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/StatusReporter.java new file mode 100644 index 0000000..3ce1862 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/StatusReporter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api; + +/** + * Interface wrapping around functions from + * org.appache.hadoop.mapreduce.StatusReporter + */ +public interface StatusReporter { + /** Get specified counter handler */ + Counter getCounter(String group, String name); + /** Report progress to the Hadoop framework. */ + void progress(); + /** Set the current status of the task to the given string. */ + void setStatus(String status); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockComputation.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockComputation.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockComputation.java new file mode 100644 index 0000000..930b62b --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockComputation.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.giraph; + +import java.io.IOException; + +import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic; +import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; +import org.apache.giraph.graph.AbstractComputation; +import org.apache.giraph.graph.Vertex; + + +/** + * Computation that executes receiver and sender blocks passed + * into BlockWorkerPieces. + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public final class BlockComputation extends AbstractComputation { + private BlockWorkerLogic workerLogic; + + @Override + public void preSuperstep() { + BlockWorkerPieces workerPieces = + BlockWorkerPieces.getNextWorkerPieces(this); + workerLogic = new BlockWorkerLogic(workerPieces); + BlockWorkerApiWrapper workerApi = new BlockWorkerApiWrapper<>(this); + workerLogic.preSuperstep(workerApi, workerApi); + } + + @Override + public void compute(Vertex vertex, Iterable messages) throws IOException { + workerLogic.compute(vertex, messages); + } + + @Override + public void postSuperstep() { + workerLogic.postSuperstep(); + } +}
