-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/7673/
-----------------------------------------------------------
Review request for giraph.
Description
-------
This patch follows some of the discussion on GIRAPH-273. Here is a brief
description of how all aggregation works now:
- For each aggregator, we determine which worker owns it using the hash code of
aggregator name.
- At the end of the superstep, worker first sends values its vertices
aggregated to the owners of aggregators. (SendWorkerAggregatorsRequest)
- After receiving all these partial values and aggregating them together,
worker will send final aggregated values of aggregators which it owns to the
master. (SendAggregatorsToMasterRequest)
- Master will get all aggregated values, do master.compute, and later send
aggregators to their owners. (SendAggregatorsToOwnerRequest)
- When worker receives its aggregators from master, it will distribute them
further to all other workers. (SendAggregatorsToWorkerRequest)
- When worker receives aggregators from all workers, it's ready to proceed with
the computation.
In order to avoid any additional barrier, workers keep counting the number of
each of request types they have received, so they would know (independently
from each other) when they can go to next superstep.
On master everything is kept in MasterAggregatorHandler, on worker we have
three classes:
- WorkerAggregatorHandler is used by vertex.compute - it provides the values
for getAggregatedValue, and has values to which we do aggregate.
- OwnerAggregatorServerData - here we keep aggregating partial aggregated
values from other workers, for aggregators which we own.
- AllAggregatorServerData - this we use to receive aggregators from previous
superstep from master and worker owners.
I know it's a huge patch, but I'll really appreciate if someone finds time to
take a look :-) Would love to hear your comments/suggestions.
Note: When there are no aggregators, or there are just a few small ones, on our
cluster there was absolutely no time overhead with this change. That's why I
didn't want to complicate it even more and have another implementation which
still uses Zookeeper, or skips part of the described steps. Of course, if
someone finds a need for it, it can be added later.
Another possible improvement is to have something like a dictionary for all
aggregator classes which are used, and then we don't need to send the whole
name of the aggregator class with each one of them. This impacts only the case
when we have a lot of small aggregators, so again it can be added if the need
arises.
Also one thing to improve in the future is to have local copies of aggregators
per thread, so we could avoid synchronization there.
This addresses bug GIRAPH-273.
https://issues.apache.org/jira/browse/GIRAPH-273
Diffs
-----
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/ServerData.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorHandler.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
1400335
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
PRE-CREATION
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
1400335
Diff: https://reviews.apache.org/r/7673/diff/
Testing
-------
mvn clean verify, tests in pseudo-distributed mode.
AggregatorsBenchmark (which also checks for correctness) on various amount of
aggregators and wokrers.
Tested on fb application which uses a lot of big aggregators, also tested it
with multithreading.
Thanks,
Maja Kabiljo