I found a bug in aggregator.

In parseMessages, you calls masterAggregation() method. Do you think
everything is OK?

  /**
   * Method to let the custom master aggregator read messages from peers and
   * aggregate a value.
   */
  @SuppressWarnings("unchecked")
  public void masterAggregation(Text name, Writable value) {
    String nameIdx = name.toString().split(";", 2)[1];
    this.Aggregators.get(nameIdx).aggregate(null, value);

    // When it's time to send the values, we can see which aggregators are used.
    this.aggregatorsUsed.add(nameIdx);
  }

The aggregated value will be always the last value.

Like our old code,

getAggregationRunner().aggregateVertex(lastValue, vertex);

You should aggregates all values.

-- 
Best Regards, Edward J. Yoon
@eddieyoon

Reply via email to