Repository: reef
Updated Branches:
  refs/heads/master ea0bebbc4 -> 48d47fe0f


[REEF-1131] Create, define, and apply VortexAggregationPolicy

This addressed the issue by
  * Add VortexAggregatePolicy.
  * Change logic on VortexWorker to aggregate based on VortexAggregatePolicy.
  * Modify example to work with VortexAggregatePolicy

JIRA:
  [REEF-1131](https://issues.apache.org/jira/browse/REEF-1131)

Pull Request:
  Closes #801


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/48d47fe0
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/48d47fe0
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/48d47fe0

Branch: refs/heads/master
Commit: 48d47fe0f8663a623124df20736465da1b077f93
Parents: ea0bebb
Author: Andrew Chung <[email protected]>
Authored: Tue Jan 26 16:32:05 2016 -0800
Committer: Byung-Gon Chun <[email protected]>
Committed: Wed Feb 3 20:42:37 2016 +0900

----------------------------------------------------------------------
 .../src/main/avro/VortexRequest.avsc            |   3 +-
 .../reef/vortex/api/VortexAggregatePolicy.java  | 112 ++++++++++++++
 .../reef/vortex/api/VortexThreadPool.java       |  15 +-
 .../common/AggregateFunctionRepository.java     |  24 ++-
 .../common/TaskletAggregationRequest.java       |  13 +-
 .../reef/vortex/common/VortexAvroUtils.java     |   9 +-
 .../reef/vortex/driver/DefaultVortexMaster.java |   6 +-
 .../reef/vortex/driver/RunningWorkers.java      |   7 +-
 .../apache/reef/vortex/driver/VortexMaster.java |   4 +-
 .../reef/vortex/driver/VortexWorkerManager.java |   6 +-
 .../vortex/evaluator/AggregateContainer.java    | 148 +++++++++++++++++--
 .../reef/vortex/evaluator/VortexWorker.java     |  18 +--
 .../examples/sumones/SumOnesAggregateStart.java |   3 +-
 .../vortex/sumones/SumOnesTestStart.java        |   3 +-
 14 files changed, 320 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc 
b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
index d0a02fb..c0ab97b 100644
--- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
+++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc
@@ -34,7 +34,8 @@
     "fields": [
       {"name": "aggregateFunctionId", "type": "int"},
       {"name": "serializedUserFunction", "type": "bytes"},
-      {"name": "serializedAggregateFunction", "type": "bytes"}
+      {"name": "serializedAggregateFunction", "type": "bytes"},
+      {"name": "serializedPolicy", "type": "bytes"}
     ]
   },
   {

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
new file mode 100644
index 0000000..ef10eb4
--- /dev/null
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.vortex.api;
+
+import org.apache.reef.annotations.Unstable;
+import org.apache.reef.annotations.audience.ClientSide;
+import org.apache.reef.annotations.audience.Public;
+import org.apache.reef.util.Builder;
+import org.apache.reef.util.Optional;
+
+import java.io.Serializable;
+
+/**
+ * The policy for local aggregation on the {@link 
org.apache.reef.vortex.evaluator.VortexWorker}s.
+ * The Aggregation function will be triggered on the individual {@link 
VortexFunction} results on
+ * an "OR" basis of what is specified by the policy.
+ * TODO[REEF-504]: Clean up Serializable in Vortex.
+ */
+@ClientSide
+@Public
+@Unstable
+public final class VortexAggregatePolicy implements Serializable {
+  private final Optional<Integer> count;
+  private final int periodMilliseconds;
+
+  private VortexAggregatePolicy(final int periodMilliseconds, final 
Optional<Integer> count) {
+    this.periodMilliseconds = periodMilliseconds;
+    this.count = count;
+  }
+
+  /**
+   * @return the aggregation period in milliseconds.
+   */
+  public int getPeriodMilliseconds() {
+    return periodMilliseconds;
+  }
+
+  /**
+   * @return the count trigger for the aggregation.
+   */
+  public Optional<Integer> getCount() {
+    return count;
+  }
+
+  /**
+   * @return a new {@link Builder} for {@link VortexAggregatePolicy}.
+   */
+  public static AggregatePolicyBuilder newBuilder() {
+    return new AggregatePolicyBuilder();
+  }
+
+  /**
+   * A Builder class for {@link VortexAggregatePolicy}.
+   */
+  public static final class AggregatePolicyBuilder implements 
Builder<VortexAggregatePolicy> {
+    private Integer periodMilliseconds = null;
+    private Optional<Integer> count = Optional.empty();
+
+    private AggregatePolicyBuilder() {
+    }
+
+    /**
+     * Sets the period to trigger aggregation in milliseconds. Required 
parameter to build.
+     */
+    public AggregatePolicyBuilder setTimerPeriodTrigger(final int 
pOffsetMilliseconds) {
+      periodMilliseconds = pOffsetMilliseconds;
+      return this;
+    }
+
+    /**
+     * Sets the count trigger for aggregation. Not required.
+     */
+    public AggregatePolicyBuilder setCountTrigger(final int pCount) {
+      count = Optional.of(pCount);
+      return this;
+    }
+
+    /**
+     * Builds and returns a new {@link VortexAggregatePolicy} based on user's 
specification.
+     * The timer period is a required parameter for this to succeed.
+     * @throws IllegalArgumentException if required parameters are not set or 
if parameters are invalid.
+     */
+    @Override
+    public VortexAggregatePolicy build() throws IllegalArgumentException {
+      if (periodMilliseconds == null) {
+        throw new IllegalArgumentException("The aggregate period must be 
set.");
+      }
+
+      if (count.isPresent() && count.get() <= 0) {
+        throw new IllegalArgumentException("The count trigger must be greater 
than zero.");
+      }
+
+      return new VortexAggregatePolicy(periodMilliseconds, count);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
index fb06211..1c8c60a 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java
@@ -66,6 +66,7 @@ public final class VortexThreadPool {
   /**
    * @param aggregateFunction to run on VortexFunction outputs
    * @param function to run on Vortex
+   * @param policy on aggregation
    * @param inputs of the function
    * @param <TInput> input type
    * @param <TOutput> output type
@@ -73,14 +74,18 @@ public final class VortexThreadPool {
    */
   public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
       submit(final VortexAggregateFunction<TOutput> aggregateFunction,
-             final VortexFunction<TInput, TOutput> function, final 
List<TInput> inputs) {
+             final VortexFunction<TInput, TOutput> function,
+             final VortexAggregatePolicy policy,
+             final List<TInput> inputs) {
     return vortexMaster.enqueueTasklets(
-        aggregateFunction, function, inputs, 
Optional.<FutureCallback<AggregateResult<TInput, TOutput>>>empty());
+        aggregateFunction, function, policy, inputs,
+        Optional.<FutureCallback<AggregateResult<TInput, TOutput>>>empty());
   }
 
   /**
    * @param aggregateFunction to run on VortexFunction outputs
    * @param function to run on Vortex
+   * @param policy on aggregation
    * @param inputs of the function
    * @param callback of the aggregation
    * @param <TInput> input type
@@ -89,8 +94,10 @@ public final class VortexThreadPool {
    */
   public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
       submit(final VortexAggregateFunction<TOutput> aggregateFunction,
-             final VortexFunction<TInput, TOutput> function, final 
List<TInput> inputs,
+             final VortexFunction<TInput, TOutput> function,
+             final VortexAggregatePolicy policy,
+             final List<TInput> inputs,
              final FutureCallback<AggregateResult<TInput, TOutput>> callback) {
-    return vortexMaster.enqueueTasklets(aggregateFunction, function, inputs, 
Optional.of(callback));
+    return vortexMaster.enqueueTasklets(aggregateFunction, function, policy, 
inputs, Optional.of(callback));
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
index e70ee1a..c45dcde 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java
@@ -18,11 +18,12 @@
  */
 package org.apache.reef.vortex.common;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.ImmutableTriple;
+import org.apache.commons.lang3.tuple.Triple;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexAggregatePolicy;
 import org.apache.reef.vortex.api.VortexFunction;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -39,7 +40,7 @@ import java.util.concurrent.ConcurrentMap;
 @Unstable
 @Private
 public final class AggregateFunctionRepository {
-  private final ConcurrentMap<Integer, Pair<VortexAggregateFunction, 
VortexFunction>>
+  private final ConcurrentMap<Integer, Triple<VortexAggregateFunction, 
VortexFunction, VortexAggregatePolicy>>
       aggregateFunctionMap = new ConcurrentHashMap<>();
 
   @Inject
@@ -49,10 +50,12 @@ public final class AggregateFunctionRepository {
   /**
    * Associates an aggregate function ID with a {@link 
VortexAggregateFunction} and a {@link VortexFunction}.
    */
-  public Pair<VortexAggregateFunction, VortexFunction> put(final int 
aggregateFunctionId,
-                                                           final 
VortexAggregateFunction aggregateFunction,
-                                                           final 
VortexFunction function) {
-    return aggregateFunctionMap.put(aggregateFunctionId, new 
ImmutablePair<>(aggregateFunction, function));
+  public Triple<VortexAggregateFunction, VortexFunction, 
VortexAggregatePolicy> put(
+      final int aggregateFunctionId,
+      final VortexAggregateFunction aggregateFunction,
+      final VortexFunction function,
+      final VortexAggregatePolicy policy) {
+    return aggregateFunctionMap.put(aggregateFunctionId, new 
ImmutableTriple<>(aggregateFunction, function, policy));
   }
 
   /**
@@ -66,6 +69,13 @@ public final class AggregateFunctionRepository {
    * Gets the {@link VortexFunction} associated with the aggregate function ID.
    */
   public VortexFunction getFunction(final int aggregateFunctionId) {
+    return aggregateFunctionMap.get(aggregateFunctionId).getMiddle();
+  }
+
+  /**
+   * Gets the {@link VortexAggregatePolicy} associated with the aggregate 
function ID.
+   */
+  public VortexAggregatePolicy getPolicy(final int aggregateFunctionId) {
     return aggregateFunctionMap.get(aggregateFunctionId).getRight();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
index 6a7e289..6d0d3a6 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java
@@ -23,6 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.io.serialization.Codec;
 import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexAggregatePolicy;
 import org.apache.reef.vortex.api.VortexFunction;
 
 import java.util.List;
@@ -38,13 +39,16 @@ public final class TaskletAggregationRequest<TInput, 
TOutput> implements VortexR
   private final int aggregateFunctionId;
   private final VortexAggregateFunction<TOutput> userAggregateFunction;
   private final VortexFunction<TInput, TOutput> function;
+  private final VortexAggregatePolicy policy;
 
   public TaskletAggregationRequest(final int aggregateFunctionId,
                                    final VortexAggregateFunction<TOutput> 
aggregateFunction,
-                                   final VortexFunction<TInput, TOutput> 
function) {
+                                   final VortexFunction<TInput, TOutput> 
function,
+                                   final VortexAggregatePolicy policy) {
     this.aggregateFunctionId = aggregateFunctionId;
     this.userAggregateFunction = aggregateFunction;
     this.function = function;
+    this.policy = policy;
   }
 
   @Override
@@ -74,6 +78,13 @@ public final class TaskletAggregationRequest<TInput, 
TOutput> implements VortexR
   }
 
   /**
+   * @return the aggregation policy.
+   */
+  public VortexAggregatePolicy getPolicy() {
+    return policy;
+  }
+
+  /**
    * Execute the aggregate function using the list of outputs.
    * @return Output of the function in a serialized form.
    */

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
index ce6b0dd..f0e930b 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java
@@ -26,6 +26,7 @@ import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexAggregatePolicy;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.common.avro.*;
 
@@ -84,12 +85,15 @@ public final class VortexAvroUtils {
           taskletAggregationRequest.getAggregateFunction());
       final byte[] serializedFunctionForAggregation = 
SerializationUtils.serialize(
           taskletAggregationRequest.getFunction());
+      final byte[] serializedPolicy = SerializationUtils.serialize(
+          taskletAggregationRequest.getPolicy());
       avroVortexRequest = AvroVortexRequest.newBuilder()
           .setRequestType(AvroRequestType.Aggregate)
           .setTaskletRequest(AvroTaskletAggregationRequest.newBuilder()
               
.setAggregateFunctionId(taskletAggregationRequest.getAggregateFunctionId())
               
.setSerializedAggregateFunction(ByteBuffer.wrap(serializedAggregateFunction))
               
.setSerializedUserFunction(ByteBuffer.wrap(serializedFunctionForAggregation))
+              .setSerializedPolicy(ByteBuffer.wrap(serializedPolicy))
               .build())
           .build();
       break;
@@ -244,8 +248,11 @@ public final class VortexAvroUtils {
       final VortexFunction functionForAggregation =
           (VortexFunction) SerializationUtils.deserialize(
               taskletAggregationRequest.getSerializedUserFunction().array());
+      final VortexAggregatePolicy policy =
+          (VortexAggregatePolicy) SerializationUtils.deserialize(
+              taskletAggregationRequest.getSerializedPolicy().array());
       vortexRequest = new 
TaskletAggregationRequest<>(taskletAggregationRequest.getAggregateFunctionId(),
-          aggregateFunction, functionForAggregation);
+          aggregateFunction, functionForAggregation, policy);
       break;
     case ExecuteTasklet:
       final AvroTaskletExecutionRequest taskletExecutionRequest =

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
index 0ce2117..21b1bd0 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java
@@ -91,10 +91,12 @@ final class DefaultVortexMaster implements VortexMaster {
   @Override
   public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
       enqueueTasklets(final VortexAggregateFunction<TOutput> aggregateFunction,
-                      final VortexFunction<TInput, TOutput> vortexFunction, 
final List<TInput> inputs,
+                      final VortexFunction<TInput, TOutput> vortexFunction,
+                      final VortexAggregatePolicy policy,
+                      final List<TInput> inputs,
                       final Optional<FutureCallback<AggregateResult<TInput, 
TOutput>>> callback) {
     final int aggregateFunctionId = aggregateIdCounter.getAndIncrement();
-    aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, 
vortexFunction);
+    aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, 
vortexFunction, policy);
     final Codec<TOutput> aggOutputCodec = aggregateFunction.getOutputCodec();
     final List<Tasklet> tasklets = new ArrayList<>(inputs.size());
     final Map<Integer, TInput> taskletIdInputMap = new 
HashMap<>(inputs.size());

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
index 226ee34..f207ab5 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java
@@ -164,8 +164,11 @@ final class RunningWorkers {
             !workerHasAggregateFunction(vortexWorkerManager.getId(), 
taskletAggFunctionId.get())) {
 
           // This assumes that all aggregate tasklets share the same user 
function.
-          vortexWorkerManager.sendAggregateFunction(taskletAggFunctionId.get(),
-              
aggregateFunctionRepository.getAggregateFunction(taskletAggFunctionId.get()), 
tasklet.getUserFunction());
+          vortexWorkerManager.sendAggregateFunction(
+              taskletAggFunctionId.get(),
+              
aggregateFunctionRepository.getAggregateFunction(taskletAggFunctionId.get()),
+              tasklet.getUserFunction(),
+              
aggregateFunctionRepository.getPolicy(taskletAggFunctionId.get()));
           
workerAggregateFunctionMap.get(vortexWorkerManager.getId()).add(taskletAggFunctionId.get());
         }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
index 09b7e0a..95cec93 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java
@@ -48,7 +48,9 @@ public interface VortexMaster {
    */
   <TInput, TOutput> VortexAggregateFuture<TInput, TOutput>
       enqueueTasklets(final VortexAggregateFunction<TOutput> aggregateFunction,
-                      final VortexFunction<TInput, TOutput> vortexFunction, 
final List<TInput> inputs,
+                      final VortexFunction<TInput, TOutput> vortexFunction,
+                      final VortexAggregatePolicy policy,
+                      final List<TInput> inputs,
                       final Optional<FutureCallback<AggregateResult<TInput, 
TOutput>>> callback);
 
   /**

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
index 478bf81..088e3cf 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java
@@ -22,6 +22,7 @@ import net.jcip.annotations.NotThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.task.RunningTask;
 import org.apache.reef.vortex.api.VortexAggregateFunction;
+import org.apache.reef.vortex.api.VortexAggregatePolicy;
 import org.apache.reef.vortex.api.VortexFunction;
 import org.apache.reef.vortex.common.TaskletAggregateExecutionRequest;
 import org.apache.reef.vortex.common.TaskletAggregationRequest;
@@ -51,9 +52,10 @@ class VortexWorkerManager {
    */
   <TInput, TOutput> void sendAggregateFunction(final int aggregateFunctionId,
                                                final 
VortexAggregateFunction<TOutput> aggregateFunction,
-                                               final VortexFunction<TInput, 
TOutput> function) {
+                                               final VortexFunction<TInput, 
TOutput> function,
+                                               final VortexAggregatePolicy 
policy) {
     final TaskletAggregationRequest<TInput, TOutput> taskletAggregationRequest 
=
-        new TaskletAggregationRequest<>(aggregateFunctionId, 
aggregateFunction, function);
+        new TaskletAggregationRequest<>(aggregateFunctionId, 
aggregateFunction, function, policy);
 
     // The send is synchronous such that we make sure that the aggregate 
function is sent to the
     // target worker before attempting to launch an aggregateable tasklet on 
it.

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
index 5687e0b..1ea3876 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java
@@ -23,12 +23,15 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.util.Optional;
+import org.apache.reef.task.HeartBeatTriggerManager;
 import org.apache.reef.vortex.common.*;
 
 import javax.annotation.concurrent.GuardedBy;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A container for tasklet aggregation, used to preserve output from individual
@@ -42,6 +45,13 @@ final class AggregateContainer {
 
   private final Object stateLock = new Object();
   private final TaskletAggregationRequest taskletAggregationRequest;
+  private final HeartBeatTriggerManager heartBeatTriggerManager;
+  private final VortexAvroUtils vortexAvroUtils;
+  private final BlockingDeque<byte[]> workerReportsQueue;
+  private final ScheduledExecutorService timer = 
Executors.newScheduledThreadPool(1);
+
+  @GuardedBy("stateLock")
+  private final HashMap<Integer, Integer> pendingTasklets = new HashMap<>();
 
   @GuardedBy("stateLock")
   private final List<Pair<Integer, Object>> completedTasklets = new 
ArrayList<>();
@@ -49,7 +59,13 @@ final class AggregateContainer {
   @GuardedBy("stateLock")
   private final List<Pair<Integer, Exception>> failedTasklets = new 
ArrayList<>();
 
-  AggregateContainer(final TaskletAggregationRequest 
taskletAggregationRequest) {
+  AggregateContainer(final HeartBeatTriggerManager heartBeatTriggerManager,
+                     final VortexAvroUtils vortexAvroUtils,
+                     final BlockingDeque<byte[]> workerReportsQueue,
+                     final TaskletAggregationRequest 
taskletAggregationRequest) {
+    this.heartBeatTriggerManager = heartBeatTriggerManager;
+    this.vortexAvroUtils = vortexAvroUtils;
+    this.workerReportsQueue = workerReportsQueue;
     this.taskletAggregationRequest = taskletAggregationRequest;
   }
 
@@ -57,16 +73,10 @@ final class AggregateContainer {
     return taskletAggregationRequest;
   }
 
-  /**
-   * Performs the output aggregation and generates the {@link WorkerReport} to 
report back to the
-   * {@link org.apache.reef.vortex.driver.VortexDriver}.
-   */
-  public Optional<WorkerReport> aggregateTasklets() {
-    final List<TaskletReport> taskletReports = new ArrayList<>();
-    final List<Object> results = new ArrayList<>();
-    final List<Integer> aggregatedTasklets = new ArrayList<>();
-
-    // Synchronization to prevent duplication of work on the same aggregation 
function on the same worker.
+  @GuardedBy("stateLock")
+  private void aggregateTasklets(final List<TaskletReport> taskletReports,
+                                 final List<Object> results,
+                                 final List<Integer> aggregatedTasklets) {
     synchronized (stateLock) {
       // Add the successful tasklets for aggregation.
       for (final Pair<Integer, Object> resultPair : completedTasklets) {
@@ -83,6 +93,34 @@ final class AggregateContainer {
       completedTasklets.clear();
       failedTasklets.clear();
     }
+  }
+
+  /**
+   * Performs the output aggregation and generates the {@link WorkerReport} to 
report back to the
+   * {@link org.apache.reef.vortex.driver.VortexDriver}.
+   */
+  private void aggregateTasklets(final AggregateTriggerType type) {
+    final List<TaskletReport> taskletReports = new ArrayList<>();
+    final List<Object> results = new ArrayList<>();
+    final List<Integer> aggregatedTasklets = new ArrayList<>();
+
+    // Synchronization to prevent duplication of work on the same aggregation 
function on the same worker.
+    synchronized (stateLock) {
+      switch(type) {
+      case ALARM:
+        aggregateTasklets(taskletReports, results, aggregatedTasklets);
+        break;
+      case COUNT:
+        if (!aggregateOnCount()) {
+          return;
+        }
+
+        aggregateTasklets(taskletReports, results, aggregatedTasklets);
+        break;
+      default:
+        throw new RuntimeException("Unexpected aggregate type.");
+      }
+    }
 
     if (!results.isEmpty()) {
       // Run the aggregation function.
@@ -94,15 +132,64 @@ final class AggregateContainer {
       }
     }
 
-    return taskletReports.isEmpty() ? Optional.<WorkerReport>empty() : 
Optional.of(new WorkerReport(taskletReports));
+    // Add to worker report only if there is something to report back.
+    if (!taskletReports.isEmpty()) {
+      workerReportsQueue.addLast(vortexAvroUtils.toBytes(new 
WorkerReport(taskletReports)));
+      heartBeatTriggerManager.triggerHeartBeat();
+    }
+  }
+
+  /**
+   * Schedule aggregation tasks on a Timer. Creates a new timer schedule for 
triggering the aggregation function
+   * if this is the first time the aggregation function has tasklets scheduled 
on it.
+   * Adds the Tasklet to pending Tasklets.
+   */
+  public void scheduleTasklet(final int taskletId) {
+    synchronized (stateLock) {
+      // If there are tasklets are pending to be executed, then that means 
that a
+      // timer has already been scheduled for an aggregation.
+      if (!outstandingTasklets()) {
+        timer.schedule(new Runnable() {
+          @Override
+          public void run() {
+            aggregateTasklets(AggregateTriggerType.ALARM);
+            synchronized (stateLock) {
+              // On the callback, if there are tasklets pending to be 
executed, that means that this alarm
+              // was triggered by a previous alarm, so we should continue to 
trigger more alarms. Otherwise
+              // we are done with tasklets for this aggregation function for 
now.
+              // If more tasklets for this aggregation function arrive, it 
will be triggered by the outer
+              // call to timer.schedule.
+              if (outstandingTasklets()) {
+                timer.schedule(
+                    this, 
taskletAggregationRequest.getPolicy().getPeriodMilliseconds(), 
TimeUnit.MILLISECONDS);
+              }
+            }
+          }
+        }, taskletAggregationRequest.getPolicy().getPeriodMilliseconds(), 
TimeUnit.MILLISECONDS);
+      }
+
+      // Add to pending tasklets, such that on the callback the timer can be 
refreshed.
+      if (!pendingTasklets.containsKey(taskletId)) {
+        pendingTasklets.put(taskletId, 0);
+      }
+
+      pendingTasklets.put(taskletId, pendingTasklets.get(taskletId) + 1);
+    }
   }
 
   /**
    * Reported when an associated tasklet is complete and adds it to the 
completion pool.
    */
   public void taskletComplete(final int taskletId, final Object result) {
+    final boolean aggregateOnCount;
     synchronized (stateLock) {
       completedTasklets.add(new ImmutablePair<>(taskletId, result));
+      removePendingTaskletReferenceCount(taskletId);
+      aggregateOnCount = aggregateOnCount();
+    }
+
+    if (aggregateOnCount) {
+      aggregateTasklets(AggregateTriggerType.COUNT);
     }
   }
 
@@ -110,8 +197,39 @@ final class AggregateContainer {
    * Reported when an associated tasklet is complete and adds it to the 
failure pool.
    */
   public void taskletFailed(final int taskletId, final Exception e) {
+    final boolean aggregateOnCount;
     synchronized (stateLock) {
       failedTasklets.add(new ImmutablePair<>(taskletId, e));
+      removePendingTaskletReferenceCount(taskletId);
+      aggregateOnCount = aggregateOnCount();
+    }
+
+    if (aggregateOnCount) {
+      aggregateTasklets(AggregateTriggerType.COUNT);
     }
   }
+
+  @GuardedBy("stateLock")
+  private void removePendingTaskletReferenceCount(final int taskletId) {
+    pendingTasklets.put(taskletId, pendingTasklets.get(taskletId) - 1);
+    if (pendingTasklets.get(taskletId) <= 0) {
+      pendingTasklets.remove(taskletId);
+    }
+  }
+
+  @GuardedBy("stateLock")
+  private boolean outstandingTasklets() {
+    return !(pendingTasklets.isEmpty() && completedTasklets.isEmpty() && 
failedTasklets.isEmpty());
+  }
+
+  @GuardedBy("stateLock")
+  private boolean aggregateOnCount() {
+    return taskletAggregationRequest.getPolicy().getCount().isPresent() &&
+        completedTasklets.size() + failedTasklets.size() >= 
taskletAggregationRequest.getPolicy().getCount().get();
+  }
+
+  private enum AggregateTriggerType {
+    ALARM,
+    COUNT
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
index 897d6e9..3d53bc4 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java
@@ -103,14 +103,15 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
             case AggregateTasklets:
               final TaskletAggregationRequest taskletAggregationRequest = 
(TaskletAggregationRequest) vortexRequest;
               
aggregates.put(taskletAggregationRequest.getAggregateFunctionId(),
-                  new AggregateContainer(taskletAggregationRequest));
+                  new AggregateContainer(heartBeatTriggerManager, 
vortexAvroUtils, workerReports,
+                      taskletAggregationRequest));
 
               // VortexFunctions need to be put into the repository such that 
VortexAvroUtils will know how to
               // convert inputs and functions into a VortexRequest on 
subsequent messages requesting to
               // execute the aggregateable tasklets.
               
aggregateFunctionRepository.put(taskletAggregationRequest.getAggregateFunctionId(),
-                  taskletAggregationRequest.getAggregateFunction(), 
taskletAggregationRequest.getFunction());
-
+                  taskletAggregationRequest.getAggregateFunction(), 
taskletAggregationRequest.getFunction(),
+                  taskletAggregationRequest.getPolicy());
               break;
             case ExecuteAggregateTasklet:
               executeAggregateTasklet(commandExecutor, vortexRequest);
@@ -185,7 +186,6 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
               LOG.log(Level.SEVERE, "Cannot wait for Future to be put.");
               throw new RuntimeException(e);
             }
-
             futures.remove(taskletExecutionRequest.getTaskletId());
             heartBeatTriggerManager.triggerHeartBeat();
           }
@@ -213,20 +213,12 @@ public final class VortexWorker implements Task, 
TaskMessageSource {
       @Override
       public void run() {
         try {
+          
aggregateContainer.scheduleTasklet(taskletAggregateExecutionRequest.getTaskletId());
           final Object result = 
aggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput());
           
aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(),
 result);
         } catch (final Exception e) {
           
aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(),
 e);
         }
-
-        // TODO[JIRA REEF-1131]: Call according to aggregate policies.
-        final Optional<WorkerReport> workerReport = 
aggregateContainer.aggregateTasklets();
-
-        // Add to worker report only if there is something to report back.
-        if (workerReport.isPresent()) {
-          workerReports.addLast(vortexAvroUtils.toBytes(workerReport.get()));
-          heartBeatTriggerManager.triggerHeartBeat();
-        }
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java
 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java
index bd09565..3a763f5 100644
--- 
a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java
+++ 
b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java
@@ -47,7 +47,8 @@ final class SumOnesAggregateStart implements VortexStart {
     }
 
     final VortexAggregateFuture<Integer, Integer> future =
-        vortexThreadPool.submit(new AdditionAggregateFunction(), new 
IdentityFunction(), inputVector);
+        vortexThreadPool.submit(new AdditionAggregateFunction(), new 
IdentityFunction(),
+            
VortexAggregatePolicy.newBuilder().setTimerPeriodTrigger(3000).build(), 
inputVector);
 
     try {
       AggregateResultSynchronous<Integer, Integer> result;

http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java
index c742d3e..95c2348 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java
@@ -45,7 +45,8 @@ public final class SumOnesTestStart implements VortexStart {
     }
 
     final VortexAggregateFuture<Integer, Integer> future =
-        vortexThreadPool.submit(new AdditionAggregateFunction(), new 
IdentityFunction(), inputVector);
+        vortexThreadPool.submit(new AdditionAggregateFunction(), new 
IdentityFunction(),
+            
VortexAggregatePolicy.newBuilder().setTimerPeriodTrigger(3000).build(), 
inputVector);
 
     try {
       AggregateResultSynchronous<Integer, Integer> result;

Reply via email to