-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/7673/
-----------------------------------------------------------

Review request for giraph.


Description
-------

This patch follows some of the discussion on GIRAPH-273. Here is a brief 
description of how all aggregation works now:

- For each aggregator, we determine which worker owns it using the hash code of 
aggregator name.
- At the end of the superstep, worker first sends values its vertices 
aggregated to the owners of aggregators. (SendWorkerAggregatorsRequest)
- After receiving all these partial values and aggregating them together, 
worker will send final aggregated values of aggregators which it owns to the 
master. (SendAggregatorsToMasterRequest)
- Master will get all aggregated values, do master.compute, and later send 
aggregators to their owners. (SendAggregatorsToOwnerRequest)
- When worker receives its aggregators from master, it will distribute them 
further to all other workers. (SendAggregatorsToWorkerRequest)
- When worker receives aggregators from all workers, it's ready to proceed with 
the computation.

In order to avoid any additional barrier, workers keep counting the number of 
each of request types they have received, so they would know (independently 
from each other) when they can go to next superstep.

On master everything is kept in MasterAggregatorHandler, on worker we have 
three classes:
- WorkerAggregatorHandler is used by vertex.compute - it provides the values 
for getAggregatedValue, and has values to which we do aggregate.
- OwnerAggregatorServerData - here we keep aggregating partial aggregated 
values from other workers, for aggregators which we own.
- AllAggregatorServerData - this we use to receive aggregators from previous 
superstep from master and worker owners. 

I know it's a huge patch, but I'll really appreciate if someone finds time to 
take a look :-) Would love to hear your comments/suggestions.

Note: When there are no aggregators, or there are just a few small ones, on our 
cluster there was absolutely no time overhead with this change. That's why I 
didn't want to complicate it even more and have another implementation which 
still uses Zookeeper, or skips part of the described steps. Of course, if 
someone finds a need for it, it can be added later.

Another possible improvement is to have something like a dictionary for all 
aggregator classes which are used, and then we don't need to send the whole 
name of the aggregator class with each one of them. This impacts only the case 
when we have a lot of small aggregators, so again it can be added if the need 
arises.

Also one thing to improve in the future is to have local copies of aggregators 
per thread, so we could avoid synchronization there.


This addresses bug GIRAPH-273.
    https://issues.apache.org/jira/browse/GIRAPH-273


Diffs
-----

  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/benchmark/AggregatorsBenchmark.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceMaster.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/MasterClient.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/ServerData.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatedValueOutputStream.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorOutputStream.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AggregatorUtils.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/AllAggregatorServerData.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingCache.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/CountingOutputStream.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/OwnerAggregatorServerData.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatedValueCache.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/SendAggregatorCache.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/WorkerAggregatorRequestProcessor.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/aggregators/package-info.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClient.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterClientServer.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyMasterServer.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/NettyWorkerAggregatorRequestProcessor.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/netty/handler/MasterRequestServerHandler.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/ByteArrayRequest.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/MasterRequest.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/RequestType.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToMasterRequest.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToOwnerRequest.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendAggregatorsToWorkerRequest.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/comm/requests/SendWorkerAggregatorsRequest.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/AggregatorHandler.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspService.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/GraphMapper.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/MasterAggregatorHandler.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/graph/WorkerAggregatorHandler.java
 1400335 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/main/java/org/apache/giraph/utils/ExpectedBarrier.java
 PRE-CREATION 
  
http://svn.apache.org/repos/asf/giraph/trunk/giraph/src/test/java/org/apache/giraph/graph/TestAggregatorsHandling.java
 1400335 

Diff: https://reviews.apache.org/r/7673/diff/


Testing
-------

mvn clean verify, tests in pseudo-distributed mode.
AggregatorsBenchmark (which also checks for correctness) on various amount of 
aggregators and wokrers.
Tested on fb application which uses a lot of big aggregators, also tested it 
with multithreading.


Thanks,

Maja Kabiljo

Reply via email to