Repository: reef Updated Branches: refs/heads/master ea0bebbc4 -> 48d47fe0f
[REEF-1131] Create, define, and apply VortexAggregationPolicy This addressed the issue by * Add VortexAggregatePolicy. * Change logic on VortexWorker to aggregate based on VortexAggregatePolicy. * Modify example to work with VortexAggregatePolicy JIRA: [REEF-1131](https://issues.apache.org/jira/browse/REEF-1131) Pull Request: Closes #801 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/48d47fe0 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/48d47fe0 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/48d47fe0 Branch: refs/heads/master Commit: 48d47fe0f8663a623124df20736465da1b077f93 Parents: ea0bebb Author: Andrew Chung <[email protected]> Authored: Tue Jan 26 16:32:05 2016 -0800 Committer: Byung-Gon Chun <[email protected]> Committed: Wed Feb 3 20:42:37 2016 +0900 ---------------------------------------------------------------------- .../src/main/avro/VortexRequest.avsc | 3 +- .../reef/vortex/api/VortexAggregatePolicy.java | 112 ++++++++++++++ .../reef/vortex/api/VortexThreadPool.java | 15 +- .../common/AggregateFunctionRepository.java | 24 ++- .../common/TaskletAggregationRequest.java | 13 +- .../reef/vortex/common/VortexAvroUtils.java | 9 +- .../reef/vortex/driver/DefaultVortexMaster.java | 6 +- .../reef/vortex/driver/RunningWorkers.java | 7 +- .../apache/reef/vortex/driver/VortexMaster.java | 4 +- .../reef/vortex/driver/VortexWorkerManager.java | 6 +- .../vortex/evaluator/AggregateContainer.java | 148 +++++++++++++++++-- .../reef/vortex/evaluator/VortexWorker.java | 18 +-- .../examples/sumones/SumOnesAggregateStart.java | 3 +- .../vortex/sumones/SumOnesTestStart.java | 3 +- 14 files changed, 320 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc index d0a02fb..c0ab97b 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc +++ b/lang/java/reef-applications/reef-vortex/src/main/avro/VortexRequest.avsc @@ -34,7 +34,8 @@ "fields": [ {"name": "aggregateFunctionId", "type": "int"}, {"name": "serializedUserFunction", "type": "bytes"}, - {"name": "serializedAggregateFunction", "type": "bytes"} + {"name": "serializedAggregateFunction", "type": "bytes"}, + {"name": "serializedPolicy", "type": "bytes"} ] }, { http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java new file mode 100644 index 0000000..ef10eb4 --- /dev/null +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexAggregatePolicy.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.reef.vortex.api; + +import org.apache.reef.annotations.Unstable; +import org.apache.reef.annotations.audience.ClientSide; +import org.apache.reef.annotations.audience.Public; +import org.apache.reef.util.Builder; +import org.apache.reef.util.Optional; + +import java.io.Serializable; + +/** + * The policy for local aggregation on the {@link org.apache.reef.vortex.evaluator.VortexWorker}s. + * The Aggregation function will be triggered on the individual {@link VortexFunction} results on + * an "OR" basis of what is specified by the policy. + * TODO[REEF-504]: Clean up Serializable in Vortex. + */ +@ClientSide +@Public +@Unstable +public final class VortexAggregatePolicy implements Serializable { + private final Optional<Integer> count; + private final int periodMilliseconds; + + private VortexAggregatePolicy(final int periodMilliseconds, final Optional<Integer> count) { + this.periodMilliseconds = periodMilliseconds; + this.count = count; + } + + /** + * @return the aggregation period in milliseconds. + */ + public int getPeriodMilliseconds() { + return periodMilliseconds; + } + + /** + * @return the count trigger for the aggregation. + */ + public Optional<Integer> getCount() { + return count; + } + + /** + * @return a new {@link Builder} for {@link VortexAggregatePolicy}. + */ + public static AggregatePolicyBuilder newBuilder() { + return new AggregatePolicyBuilder(); + } + + /** + * A Builder class for {@link VortexAggregatePolicy}. + */ + public static final class AggregatePolicyBuilder implements Builder<VortexAggregatePolicy> { + private Integer periodMilliseconds = null; + private Optional<Integer> count = Optional.empty(); + + private AggregatePolicyBuilder() { + } + + /** + * Sets the period to trigger aggregation in milliseconds. Required parameter to build. + */ + public AggregatePolicyBuilder setTimerPeriodTrigger(final int pOffsetMilliseconds) { + periodMilliseconds = pOffsetMilliseconds; + return this; + } + + /** + * Sets the count trigger for aggregation. Not required. + */ + public AggregatePolicyBuilder setCountTrigger(final int pCount) { + count = Optional.of(pCount); + return this; + } + + /** + * Builds and returns a new {@link VortexAggregatePolicy} based on user's specification. + * The timer period is a required parameter for this to succeed. + * @throws IllegalArgumentException if required parameters are not set or if parameters are invalid. + */ + @Override + public VortexAggregatePolicy build() throws IllegalArgumentException { + if (periodMilliseconds == null) { + throw new IllegalArgumentException("The aggregate period must be set."); + } + + if (count.isPresent() && count.get() <= 0) { + throw new IllegalArgumentException("The count trigger must be greater than zero."); + } + + return new VortexAggregatePolicy(periodMilliseconds, count); + } + } +} http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java index fb06211..1c8c60a 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/api/VortexThreadPool.java @@ -66,6 +66,7 @@ public final class VortexThreadPool { /** * @param aggregateFunction to run on VortexFunction outputs * @param function to run on Vortex + * @param policy on aggregation * @param inputs of the function * @param <TInput> input type * @param <TOutput> output type @@ -73,14 +74,18 @@ public final class VortexThreadPool { */ public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput> submit(final VortexAggregateFunction<TOutput> aggregateFunction, - final VortexFunction<TInput, TOutput> function, final List<TInput> inputs) { + final VortexFunction<TInput, TOutput> function, + final VortexAggregatePolicy policy, + final List<TInput> inputs) { return vortexMaster.enqueueTasklets( - aggregateFunction, function, inputs, Optional.<FutureCallback<AggregateResult<TInput, TOutput>>>empty()); + aggregateFunction, function, policy, inputs, + Optional.<FutureCallback<AggregateResult<TInput, TOutput>>>empty()); } /** * @param aggregateFunction to run on VortexFunction outputs * @param function to run on Vortex + * @param policy on aggregation * @param inputs of the function * @param callback of the aggregation * @param <TInput> input type @@ -89,8 +94,10 @@ public final class VortexThreadPool { */ public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput> submit(final VortexAggregateFunction<TOutput> aggregateFunction, - final VortexFunction<TInput, TOutput> function, final List<TInput> inputs, + final VortexFunction<TInput, TOutput> function, + final VortexAggregatePolicy policy, + final List<TInput> inputs, final FutureCallback<AggregateResult<TInput, TOutput>> callback) { - return vortexMaster.enqueueTasklets(aggregateFunction, function, inputs, Optional.of(callback)); + return vortexMaster.enqueueTasklets(aggregateFunction, function, policy, inputs, Optional.of(callback)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java index e70ee1a..c45dcde 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/AggregateFunctionRepository.java @@ -18,11 +18,12 @@ */ package org.apache.reef.vortex.common; -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.Triple; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.Private; import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexAggregatePolicy; import org.apache.reef.vortex.api.VortexFunction; import javax.annotation.concurrent.ThreadSafe; @@ -39,7 +40,7 @@ import java.util.concurrent.ConcurrentMap; @Unstable @Private public final class AggregateFunctionRepository { - private final ConcurrentMap<Integer, Pair<VortexAggregateFunction, VortexFunction>> + private final ConcurrentMap<Integer, Triple<VortexAggregateFunction, VortexFunction, VortexAggregatePolicy>> aggregateFunctionMap = new ConcurrentHashMap<>(); @Inject @@ -49,10 +50,12 @@ public final class AggregateFunctionRepository { /** * Associates an aggregate function ID with a {@link VortexAggregateFunction} and a {@link VortexFunction}. */ - public Pair<VortexAggregateFunction, VortexFunction> put(final int aggregateFunctionId, - final VortexAggregateFunction aggregateFunction, - final VortexFunction function) { - return aggregateFunctionMap.put(aggregateFunctionId, new ImmutablePair<>(aggregateFunction, function)); + public Triple<VortexAggregateFunction, VortexFunction, VortexAggregatePolicy> put( + final int aggregateFunctionId, + final VortexAggregateFunction aggregateFunction, + final VortexFunction function, + final VortexAggregatePolicy policy) { + return aggregateFunctionMap.put(aggregateFunctionId, new ImmutableTriple<>(aggregateFunction, function, policy)); } /** @@ -66,6 +69,13 @@ public final class AggregateFunctionRepository { * Gets the {@link VortexFunction} associated with the aggregate function ID. */ public VortexFunction getFunction(final int aggregateFunctionId) { + return aggregateFunctionMap.get(aggregateFunctionId).getMiddle(); + } + + /** + * Gets the {@link VortexAggregatePolicy} associated with the aggregate function ID. + */ + public VortexAggregatePolicy getPolicy(final int aggregateFunctionId) { return aggregateFunctionMap.get(aggregateFunctionId).getRight(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java index 6a7e289..6d0d3a6 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/TaskletAggregationRequest.java @@ -23,6 +23,7 @@ import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.io.serialization.Codec; import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexAggregatePolicy; import org.apache.reef.vortex.api.VortexFunction; import java.util.List; @@ -38,13 +39,16 @@ public final class TaskletAggregationRequest<TInput, TOutput> implements VortexR private final int aggregateFunctionId; private final VortexAggregateFunction<TOutput> userAggregateFunction; private final VortexFunction<TInput, TOutput> function; + private final VortexAggregatePolicy policy; public TaskletAggregationRequest(final int aggregateFunctionId, final VortexAggregateFunction<TOutput> aggregateFunction, - final VortexFunction<TInput, TOutput> function) { + final VortexFunction<TInput, TOutput> function, + final VortexAggregatePolicy policy) { this.aggregateFunctionId = aggregateFunctionId; this.userAggregateFunction = aggregateFunction; this.function = function; + this.policy = policy; } @Override @@ -74,6 +78,13 @@ public final class TaskletAggregationRequest<TInput, TOutput> implements VortexR } /** + * @return the aggregation policy. + */ + public VortexAggregatePolicy getPolicy() { + return policy; + } + + /** * Execute the aggregate function using the list of outputs. * @return Output of the function in a serialized form. */ http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java index ce6b0dd..f0e930b 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/common/VortexAvroUtils.java @@ -26,6 +26,7 @@ import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexAggregatePolicy; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.common.avro.*; @@ -84,12 +85,15 @@ public final class VortexAvroUtils { taskletAggregationRequest.getAggregateFunction()); final byte[] serializedFunctionForAggregation = SerializationUtils.serialize( taskletAggregationRequest.getFunction()); + final byte[] serializedPolicy = SerializationUtils.serialize( + taskletAggregationRequest.getPolicy()); avroVortexRequest = AvroVortexRequest.newBuilder() .setRequestType(AvroRequestType.Aggregate) .setTaskletRequest(AvroTaskletAggregationRequest.newBuilder() .setAggregateFunctionId(taskletAggregationRequest.getAggregateFunctionId()) .setSerializedAggregateFunction(ByteBuffer.wrap(serializedAggregateFunction)) .setSerializedUserFunction(ByteBuffer.wrap(serializedFunctionForAggregation)) + .setSerializedPolicy(ByteBuffer.wrap(serializedPolicy)) .build()) .build(); break; @@ -244,8 +248,11 @@ public final class VortexAvroUtils { final VortexFunction functionForAggregation = (VortexFunction) SerializationUtils.deserialize( taskletAggregationRequest.getSerializedUserFunction().array()); + final VortexAggregatePolicy policy = + (VortexAggregatePolicy) SerializationUtils.deserialize( + taskletAggregationRequest.getSerializedPolicy().array()); vortexRequest = new TaskletAggregationRequest<>(taskletAggregationRequest.getAggregateFunctionId(), - aggregateFunction, functionForAggregation); + aggregateFunction, functionForAggregation, policy); break; case ExecuteTasklet: final AvroTaskletExecutionRequest taskletExecutionRequest = http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java index 0ce2117..21b1bd0 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/DefaultVortexMaster.java @@ -91,10 +91,12 @@ final class DefaultVortexMaster implements VortexMaster { @Override public <TInput, TOutput> VortexAggregateFuture<TInput, TOutput> enqueueTasklets(final VortexAggregateFunction<TOutput> aggregateFunction, - final VortexFunction<TInput, TOutput> vortexFunction, final List<TInput> inputs, + final VortexFunction<TInput, TOutput> vortexFunction, + final VortexAggregatePolicy policy, + final List<TInput> inputs, final Optional<FutureCallback<AggregateResult<TInput, TOutput>>> callback) { final int aggregateFunctionId = aggregateIdCounter.getAndIncrement(); - aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, vortexFunction); + aggregateFunctionRepository.put(aggregateFunctionId, aggregateFunction, vortexFunction, policy); final Codec<TOutput> aggOutputCodec = aggregateFunction.getOutputCodec(); final List<Tasklet> tasklets = new ArrayList<>(inputs.size()); final Map<Integer, TInput> taskletIdInputMap = new HashMap<>(inputs.size()); http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java index 226ee34..f207ab5 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/RunningWorkers.java @@ -164,8 +164,11 @@ final class RunningWorkers { !workerHasAggregateFunction(vortexWorkerManager.getId(), taskletAggFunctionId.get())) { // This assumes that all aggregate tasklets share the same user function. - vortexWorkerManager.sendAggregateFunction(taskletAggFunctionId.get(), - aggregateFunctionRepository.getAggregateFunction(taskletAggFunctionId.get()), tasklet.getUserFunction()); + vortexWorkerManager.sendAggregateFunction( + taskletAggFunctionId.get(), + aggregateFunctionRepository.getAggregateFunction(taskletAggFunctionId.get()), + tasklet.getUserFunction(), + aggregateFunctionRepository.getPolicy(taskletAggFunctionId.get())); workerAggregateFunctionMap.get(vortexWorkerManager.getId()).add(taskletAggFunctionId.get()); } http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java index 09b7e0a..95cec93 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexMaster.java @@ -48,7 +48,9 @@ public interface VortexMaster { */ <TInput, TOutput> VortexAggregateFuture<TInput, TOutput> enqueueTasklets(final VortexAggregateFunction<TOutput> aggregateFunction, - final VortexFunction<TInput, TOutput> vortexFunction, final List<TInput> inputs, + final VortexFunction<TInput, TOutput> vortexFunction, + final VortexAggregatePolicy policy, + final List<TInput> inputs, final Optional<FutureCallback<AggregateResult<TInput, TOutput>>> callback); /** http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java index 478bf81..088e3cf 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/driver/VortexWorkerManager.java @@ -22,6 +22,7 @@ import net.jcip.annotations.NotThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.driver.task.RunningTask; import org.apache.reef.vortex.api.VortexAggregateFunction; +import org.apache.reef.vortex.api.VortexAggregatePolicy; import org.apache.reef.vortex.api.VortexFunction; import org.apache.reef.vortex.common.TaskletAggregateExecutionRequest; import org.apache.reef.vortex.common.TaskletAggregationRequest; @@ -51,9 +52,10 @@ class VortexWorkerManager { */ <TInput, TOutput> void sendAggregateFunction(final int aggregateFunctionId, final VortexAggregateFunction<TOutput> aggregateFunction, - final VortexFunction<TInput, TOutput> function) { + final VortexFunction<TInput, TOutput> function, + final VortexAggregatePolicy policy) { final TaskletAggregationRequest<TInput, TOutput> taskletAggregationRequest = - new TaskletAggregationRequest<>(aggregateFunctionId, aggregateFunction, function); + new TaskletAggregationRequest<>(aggregateFunctionId, aggregateFunction, function, policy); // The send is synchronous such that we make sure that the aggregate function is sent to the // target worker before attempting to launch an aggregateable tasklet on it. http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java index 5687e0b..1ea3876 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/AggregateContainer.java @@ -23,12 +23,15 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.reef.annotations.Unstable; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; -import org.apache.reef.util.Optional; +import org.apache.reef.task.HeartBeatTriggerManager; import org.apache.reef.vortex.common.*; import javax.annotation.concurrent.GuardedBy; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * A container for tasklet aggregation, used to preserve output from individual @@ -42,6 +45,13 @@ final class AggregateContainer { private final Object stateLock = new Object(); private final TaskletAggregationRequest taskletAggregationRequest; + private final HeartBeatTriggerManager heartBeatTriggerManager; + private final VortexAvroUtils vortexAvroUtils; + private final BlockingDeque<byte[]> workerReportsQueue; + private final ScheduledExecutorService timer = Executors.newScheduledThreadPool(1); + + @GuardedBy("stateLock") + private final HashMap<Integer, Integer> pendingTasklets = new HashMap<>(); @GuardedBy("stateLock") private final List<Pair<Integer, Object>> completedTasklets = new ArrayList<>(); @@ -49,7 +59,13 @@ final class AggregateContainer { @GuardedBy("stateLock") private final List<Pair<Integer, Exception>> failedTasklets = new ArrayList<>(); - AggregateContainer(final TaskletAggregationRequest taskletAggregationRequest) { + AggregateContainer(final HeartBeatTriggerManager heartBeatTriggerManager, + final VortexAvroUtils vortexAvroUtils, + final BlockingDeque<byte[]> workerReportsQueue, + final TaskletAggregationRequest taskletAggregationRequest) { + this.heartBeatTriggerManager = heartBeatTriggerManager; + this.vortexAvroUtils = vortexAvroUtils; + this.workerReportsQueue = workerReportsQueue; this.taskletAggregationRequest = taskletAggregationRequest; } @@ -57,16 +73,10 @@ final class AggregateContainer { return taskletAggregationRequest; } - /** - * Performs the output aggregation and generates the {@link WorkerReport} to report back to the - * {@link org.apache.reef.vortex.driver.VortexDriver}. - */ - public Optional<WorkerReport> aggregateTasklets() { - final List<TaskletReport> taskletReports = new ArrayList<>(); - final List<Object> results = new ArrayList<>(); - final List<Integer> aggregatedTasklets = new ArrayList<>(); - - // Synchronization to prevent duplication of work on the same aggregation function on the same worker. + @GuardedBy("stateLock") + private void aggregateTasklets(final List<TaskletReport> taskletReports, + final List<Object> results, + final List<Integer> aggregatedTasklets) { synchronized (stateLock) { // Add the successful tasklets for aggregation. for (final Pair<Integer, Object> resultPair : completedTasklets) { @@ -83,6 +93,34 @@ final class AggregateContainer { completedTasklets.clear(); failedTasklets.clear(); } + } + + /** + * Performs the output aggregation and generates the {@link WorkerReport} to report back to the + * {@link org.apache.reef.vortex.driver.VortexDriver}. + */ + private void aggregateTasklets(final AggregateTriggerType type) { + final List<TaskletReport> taskletReports = new ArrayList<>(); + final List<Object> results = new ArrayList<>(); + final List<Integer> aggregatedTasklets = new ArrayList<>(); + + // Synchronization to prevent duplication of work on the same aggregation function on the same worker. + synchronized (stateLock) { + switch(type) { + case ALARM: + aggregateTasklets(taskletReports, results, aggregatedTasklets); + break; + case COUNT: + if (!aggregateOnCount()) { + return; + } + + aggregateTasklets(taskletReports, results, aggregatedTasklets); + break; + default: + throw new RuntimeException("Unexpected aggregate type."); + } + } if (!results.isEmpty()) { // Run the aggregation function. @@ -94,15 +132,64 @@ final class AggregateContainer { } } - return taskletReports.isEmpty() ? Optional.<WorkerReport>empty() : Optional.of(new WorkerReport(taskletReports)); + // Add to worker report only if there is something to report back. + if (!taskletReports.isEmpty()) { + workerReportsQueue.addLast(vortexAvroUtils.toBytes(new WorkerReport(taskletReports))); + heartBeatTriggerManager.triggerHeartBeat(); + } + } + + /** + * Schedule aggregation tasks on a Timer. Creates a new timer schedule for triggering the aggregation function + * if this is the first time the aggregation function has tasklets scheduled on it. + * Adds the Tasklet to pending Tasklets. + */ + public void scheduleTasklet(final int taskletId) { + synchronized (stateLock) { + // If there are tasklets are pending to be executed, then that means that a + // timer has already been scheduled for an aggregation. + if (!outstandingTasklets()) { + timer.schedule(new Runnable() { + @Override + public void run() { + aggregateTasklets(AggregateTriggerType.ALARM); + synchronized (stateLock) { + // On the callback, if there are tasklets pending to be executed, that means that this alarm + // was triggered by a previous alarm, so we should continue to trigger more alarms. Otherwise + // we are done with tasklets for this aggregation function for now. + // If more tasklets for this aggregation function arrive, it will be triggered by the outer + // call to timer.schedule. + if (outstandingTasklets()) { + timer.schedule( + this, taskletAggregationRequest.getPolicy().getPeriodMilliseconds(), TimeUnit.MILLISECONDS); + } + } + } + }, taskletAggregationRequest.getPolicy().getPeriodMilliseconds(), TimeUnit.MILLISECONDS); + } + + // Add to pending tasklets, such that on the callback the timer can be refreshed. + if (!pendingTasklets.containsKey(taskletId)) { + pendingTasklets.put(taskletId, 0); + } + + pendingTasklets.put(taskletId, pendingTasklets.get(taskletId) + 1); + } } /** * Reported when an associated tasklet is complete and adds it to the completion pool. */ public void taskletComplete(final int taskletId, final Object result) { + final boolean aggregateOnCount; synchronized (stateLock) { completedTasklets.add(new ImmutablePair<>(taskletId, result)); + removePendingTaskletReferenceCount(taskletId); + aggregateOnCount = aggregateOnCount(); + } + + if (aggregateOnCount) { + aggregateTasklets(AggregateTriggerType.COUNT); } } @@ -110,8 +197,39 @@ final class AggregateContainer { * Reported when an associated tasklet is complete and adds it to the failure pool. */ public void taskletFailed(final int taskletId, final Exception e) { + final boolean aggregateOnCount; synchronized (stateLock) { failedTasklets.add(new ImmutablePair<>(taskletId, e)); + removePendingTaskletReferenceCount(taskletId); + aggregateOnCount = aggregateOnCount(); + } + + if (aggregateOnCount) { + aggregateTasklets(AggregateTriggerType.COUNT); } } + + @GuardedBy("stateLock") + private void removePendingTaskletReferenceCount(final int taskletId) { + pendingTasklets.put(taskletId, pendingTasklets.get(taskletId) - 1); + if (pendingTasklets.get(taskletId) <= 0) { + pendingTasklets.remove(taskletId); + } + } + + @GuardedBy("stateLock") + private boolean outstandingTasklets() { + return !(pendingTasklets.isEmpty() && completedTasklets.isEmpty() && failedTasklets.isEmpty()); + } + + @GuardedBy("stateLock") + private boolean aggregateOnCount() { + return taskletAggregationRequest.getPolicy().getCount().isPresent() && + completedTasklets.size() + failedTasklets.size() >= taskletAggregationRequest.getPolicy().getCount().get(); + } + + private enum AggregateTriggerType { + ALARM, + COUNT + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java index 897d6e9..3d53bc4 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/evaluator/VortexWorker.java @@ -103,14 +103,15 @@ public final class VortexWorker implements Task, TaskMessageSource { case AggregateTasklets: final TaskletAggregationRequest taskletAggregationRequest = (TaskletAggregationRequest) vortexRequest; aggregates.put(taskletAggregationRequest.getAggregateFunctionId(), - new AggregateContainer(taskletAggregationRequest)); + new AggregateContainer(heartBeatTriggerManager, vortexAvroUtils, workerReports, + taskletAggregationRequest)); // VortexFunctions need to be put into the repository such that VortexAvroUtils will know how to // convert inputs and functions into a VortexRequest on subsequent messages requesting to // execute the aggregateable tasklets. aggregateFunctionRepository.put(taskletAggregationRequest.getAggregateFunctionId(), - taskletAggregationRequest.getAggregateFunction(), taskletAggregationRequest.getFunction()); - + taskletAggregationRequest.getAggregateFunction(), taskletAggregationRequest.getFunction(), + taskletAggregationRequest.getPolicy()); break; case ExecuteAggregateTasklet: executeAggregateTasklet(commandExecutor, vortexRequest); @@ -185,7 +186,6 @@ public final class VortexWorker implements Task, TaskMessageSource { LOG.log(Level.SEVERE, "Cannot wait for Future to be put."); throw new RuntimeException(e); } - futures.remove(taskletExecutionRequest.getTaskletId()); heartBeatTriggerManager.triggerHeartBeat(); } @@ -213,20 +213,12 @@ public final class VortexWorker implements Task, TaskMessageSource { @Override public void run() { try { + aggregateContainer.scheduleTasklet(taskletAggregateExecutionRequest.getTaskletId()); final Object result = aggregationRequest.executeFunction(taskletAggregateExecutionRequest.getInput()); aggregateContainer.taskletComplete(taskletAggregateExecutionRequest.getTaskletId(), result); } catch (final Exception e) { aggregateContainer.taskletFailed(taskletAggregateExecutionRequest.getTaskletId(), e); } - - // TODO[JIRA REEF-1131]: Call according to aggregate policies. - final Optional<WorkerReport> workerReport = aggregateContainer.aggregateTasklets(); - - // Add to worker report only if there is something to report back. - if (workerReport.isPresent()) { - workerReports.addLast(vortexAvroUtils.toBytes(workerReport.get())); - heartBeatTriggerManager.triggerHeartBeat(); - } } }); } http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java index bd09565..3a763f5 100644 --- a/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java +++ b/lang/java/reef-applications/reef-vortex/src/main/java/org/apache/reef/vortex/examples/sumones/SumOnesAggregateStart.java @@ -47,7 +47,8 @@ final class SumOnesAggregateStart implements VortexStart { } final VortexAggregateFuture<Integer, Integer> future = - vortexThreadPool.submit(new AdditionAggregateFunction(), new IdentityFunction(), inputVector); + vortexThreadPool.submit(new AdditionAggregateFunction(), new IdentityFunction(), + VortexAggregatePolicy.newBuilder().setTimerPeriodTrigger(3000).build(), inputVector); try { AggregateResultSynchronous<Integer, Integer> result; http://git-wip-us.apache.org/repos/asf/reef/blob/48d47fe0/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java index c742d3e..95c2348 100644 --- a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java +++ b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/applications/vortex/sumones/SumOnesTestStart.java @@ -45,7 +45,8 @@ public final class SumOnesTestStart implements VortexStart { } final VortexAggregateFuture<Integer, Integer> future = - vortexThreadPool.submit(new AdditionAggregateFunction(), new IdentityFunction(), inputVector); + vortexThreadPool.submit(new AdditionAggregateFunction(), new IdentityFunction(), + VortexAggregatePolicy.newBuilder().setTimerPeriodTrigger(3000).build(), inputVector); try { AggregateResultSynchronous<Integer, Integer> result;
